Start to refactor and wire in frontend

This commit is contained in:
Rob Watson 2021-10-29 14:52:31 +02:00
parent 0e2fb5cd47
commit 281d5ce8a2
7 changed files with 238 additions and 86 deletions

View File

@ -1,10 +1,14 @@
package main package main
import ( import (
"context"
"log" "log"
"time" "time"
"git.netflux.io/rob/clipper/server" "git.netflux.io/rob/clipper/server"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/kkdai/youtube/v2"
) )
const ( const (
@ -13,9 +17,24 @@ const (
) )
func main() { func main() {
ctx := context.Background()
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// Create an Amazon S3 service s3Client
s3Client := s3.NewFromConfig(cfg)
// Create a Youtube client
var youtubeClient youtube.Client
serverOptions := server.Options{ serverOptions := server.Options{
BindAddr: DefaultHTTPBindAddr, BindAddr: DefaultHTTPBindAddr,
Timeout: DefaultTimeout, Timeout: DefaultTimeout,
YoutubeClient: &youtubeClient,
S3Client: s3Client,
} }
log.Fatal(server.Start(serverOptions)) log.Fatal(server.Start(serverOptions))

View File

@ -8,7 +8,7 @@ import (
) )
type FetchAudioProgress struct { type FetchAudioProgress struct {
percentComplete float32 PercentComplete float32
Peaks []int16 Peaks []int16
} }
@ -21,22 +21,24 @@ type FetchAudioProgressReader interface {
// signed int16s and, given a target number of bins, emits a stream of peaks // signed int16s and, given a target number of bins, emits a stream of peaks
// corresponding to each channel of the audio data. // corresponding to each channel of the audio data.
type fetchAudioProgressReader struct { type fetchAudioProgressReader struct {
framesExpected int64
channels int channels int
framesPerBin int framesPerBin int
samples []int16 samples []int16
currPeaks []int16 currPeaks []int16
currCount int currCount int
total int framesProcessed int
progress chan FetchAudioProgress progress chan FetchAudioProgress
errorChan chan error errorChan chan error
} }
// TODO: validate inputs, debugging is confusing otherwise // TODO: validate inputs, debugging is confusing otherwise
func newFetchAudioProgressReader(expFrames int64, channels, numBins int) *fetchAudioProgressReader { func newFetchAudioProgressReader(framesExpected int64, channels, numBins int) *fetchAudioProgressReader {
return &fetchAudioProgressReader{ return &fetchAudioProgressReader{
channels: channels, channels: channels,
framesPerBin: int(expFrames / int64(numBins)), framesExpected: framesExpected,
framesPerBin: int(framesExpected / int64(numBins)),
samples: make([]int16, 8_192), samples: make([]int16, 8_192),
currPeaks: make([]int16, channels), currPeaks: make([]int16, channels),
progress: make(chan FetchAudioProgress), progress: make(chan FetchAudioProgress),
@ -82,6 +84,8 @@ func (w *fetchAudioProgressReader) Write(p []byte) (int, error) {
} }
} }
w.framesProcessed += len(samples) / w.channels
return len(p), nil return len(p), nil
} }
@ -89,6 +93,7 @@ func (w *fetchAudioProgressReader) nextBin() {
var progress FetchAudioProgress var progress FetchAudioProgress
// TODO: avoid an allocation? // TODO: avoid an allocation?
progress.Peaks = append(progress.Peaks, w.currPeaks...) progress.Peaks = append(progress.Peaks, w.currPeaks...)
progress.PercentComplete = (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0
w.progress <- progress w.progress <- progress
@ -97,7 +102,7 @@ func (w *fetchAudioProgressReader) nextBin() {
for i := 0; i < len(w.currPeaks); i++ { for i := 0; i < len(w.currPeaks); i++ {
w.currPeaks[i] = 0 w.currPeaks[i] = 0
} }
w.total++ w.framesProcessed++
} }
func (w *fetchAudioProgressReader) Read() (FetchAudioProgress, error) { func (w *fetchAudioProgressReader) Read() (FetchAudioProgress, error) {
@ -107,7 +112,7 @@ func (w *fetchAudioProgressReader) Read() (FetchAudioProgress, error) {
if !ok { if !ok {
return FetchAudioProgress{}, io.EOF return FetchAudioProgress{}, io.EOF
} }
return FetchAudioProgress{Peaks: progress.Peaks}, nil return progress, nil
case err := <-w.errorChan: case err := <-w.errorChan:
return FetchAudioProgress{}, fmt.Errorf("error waiting for progress: %v", err) return FetchAudioProgress{}, fmt.Errorf("error waiting for progress: %v", err)
} }

View File

@ -21,6 +21,11 @@ const (
rawAudioSampleRate = 48_000 rawAudioSampleRate = 48_000
) )
const (
thumbnailWidth = 177 // 16:9
thumbnailHeight = 100 // "
)
// progressReader is a reader that prints progress logs as it reads. // progressReader is a reader that prints progress logs as it reads.
type progressReader struct { type progressReader struct {
io.Reader io.Reader
@ -75,42 +80,75 @@ func (s *FetchMediaSetService) Fetch(ctx context.Context, id string) (*MediaSet,
return nil, errors.New("no format available") return nil, errors.New("no format available")
} }
formats := FilterYoutubeAudio(video.Formats) audioMetadata, err := s.fetchAudioMetadata(ctx, video)
if len(video.Formats) == 0 {
return nil, errors.New("no format available")
}
format := formats[0]
sampleRate, err := strconv.Atoi(format.AudioSampleRate)
if err != nil { if err != nil {
return nil, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate) return nil, fmt.Errorf("error fetching audio metadata: %v", err)
} }
videoMetadata, err := s.fetchVideoMetadata(ctx, video)
approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not parse audio duration: %s", err) return nil, fmt.Errorf("error fetching video metadata: %v", err)
} }
approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond
approxFrames := int64(approxDuration/time.Second) * int64(sampleRate)
mediaSet := MediaSet{ mediaSet := MediaSet{
ID: id, ID: id,
Audio: Audio{ Audio: audioMetadata,
// we need to decode it to be able to know bytes and frames exactly Video: videoMetadata,
ApproxFrames: approxFrames,
Channels: format.AudioChannels,
SampleRate: sampleRate,
},
} }
// TODO: video
// TODO: save to JSON // TODO: save to JSON
return &mediaSet, nil return &mediaSet, nil
} }
func (s *FetchMediaSetService) fetchVideoMetadata(ctx context.Context, video *youtubev2.Video) (Video, error) {
formats := FilterYoutubeVideo(video.Formats)
if len(video.Formats) == 0 {
return Video{}, errors.New("no format available")
}
format := formats[0]
durationMsecs, err := strconv.Atoi(format.ApproxDurationMs)
if err != nil {
return Video{}, fmt.Errorf("could not parse video duration: %s", err)
}
return Video{
Bytes: format.ContentLength,
ThumbnailWidth: thumbnailWidth,
ThumbnailHeight: thumbnailHeight,
Duration: time.Duration(durationMsecs) * time.Millisecond,
}, nil
}
func (s *FetchMediaSetService) fetchAudioMetadata(ctx context.Context, video *youtubev2.Video) (Audio, error) {
formats := FilterYoutubeAudio(video.Formats)
if len(video.Formats) == 0 {
return Audio{}, errors.New("no format available")
}
format := formats[0]
sampleRate, err := strconv.Atoi(format.AudioSampleRate)
if err != nil {
return Audio{}, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate)
}
approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs)
if err != nil {
return Audio{}, fmt.Errorf("could not parse audio duration: %s", err)
}
approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond
approxFrames := int64(approxDuration/time.Second) * int64(sampleRate)
return Audio{
// we need to decode it to be able to know bytes and frame counts exactly
ApproxFrames: approxFrames,
Channels: format.AudioChannels,
SampleRate: sampleRate,
}, nil
}
// FetchAudio fetches the audio part of a MediaSet. // FetchAudio fetches the audio part of a MediaSet.
func (s *FetchMediaSetService) FetchAudio(ctx context.Context, id string) (FetchAudioProgressReader, error) { func (s *FetchMediaSetService) FetchAudio(ctx context.Context, id string, numBins int) (FetchAudioProgressReader, error) {
mediaSet := NewMediaSet(id) mediaSet := NewMediaSet(id)
if !mediaSet.Exists() { if !mediaSet.Exists() {
// TODO check if audio uploaded already, don't bother again // TODO check if audio uploaded already, don't bother again

View File

@ -2,6 +2,7 @@ package server
import ( import (
"context" "context"
"io"
"log" "log"
"net/http" "net/http"
"os" "os"
@ -9,6 +10,7 @@ import (
pbMediaSet "git.netflux.io/rob/clipper/generated/pb/media_set" pbMediaSet "git.netflux.io/rob/clipper/generated/pb/media_set"
"git.netflux.io/rob/clipper/media" "git.netflux.io/rob/clipper/media"
"git.netflux.io/rob/clipper/youtube"
"github.com/improbable-eng/grpc-web/go/grpcweb" "github.com/improbable-eng/grpc-web/go/grpcweb"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
@ -18,18 +20,24 @@ import (
type Options struct { type Options struct {
BindAddr string BindAddr string
Timeout time.Duration Timeout time.Duration
YoutubeClient youtube.YoutubeClient
S3Client media.S3Client
} }
// mediaSetServiceController implements gRPC controller for MediaSetService const (
type mediaSetServiceController struct { fetchAudioTimeout = time.Minute * 5
pbMediaSet.UnimplementedMediaSetServiceServer )
mediaSetService *media.MediaSetService // fetchMediaSetServiceController implements gRPC controller for FetchMediaSetService
type fetchMediaSetServiceController struct {
pbMediaSet.UnimplementedFetchServiceServer
fetchMediaSetService *media.FetchMediaSetService
} }
// GetMediaSet returns a pbMediaSet.MediaSet // Fetch fetches a pbMediaSet.MediaSet
func (c *mediaSetServiceController) GetMediaSet(ctx context.Context, request *pbMediaSet.GetMediaSetRequest) (*pbMediaSet.MediaSet, error) { func (c *fetchMediaSetServiceController) Fetch(ctx context.Context, request *pbMediaSet.FetchRequest) (*pbMediaSet.MediaSet, error) {
mediaSet, err := c.mediaSetService.GetMediaSet(ctx, request.Source, request.Id) mediaSet, err := c.fetchMediaSetService.Fetch(ctx, request.GetId())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -53,17 +61,51 @@ func (c *mediaSetServiceController) GetMediaSet(ctx context.Context, request *pb
return &result, nil return &result, nil
} }
func (c *mediaSetServiceController) GetPeaks(*pbMediaSet.GetPeaksRequest, pbMediaSet.MediaSetService_GetPeaksServer) error { // TODO: wrap errors
func (c *fetchMediaSetServiceController) FetchAudio(request *pbMediaSet.FetchAudioRequest, stream pbMediaSet.FetchService_FetchAudioServer) error {
ctx, cancel := context.WithTimeout(context.Background(), fetchAudioTimeout)
defer cancel()
reader, err := c.fetchMediaSetService.FetchAudio(ctx, request.GetId(), int(request.GetNumBins()))
if err != nil {
return err
}
for {
progress, err := reader.Read()
if err != nil {
if err == io.EOF {
break
}
return err
}
// TODO: consider using int32 throughout the backend flow to avoid this.
peaks := make([]int32, len(progress.Peaks))
for i, p := range progress.Peaks {
peaks[i] = int32(p)
}
progressPb := pbMediaSet.FetchAudioProgress{
PercentCompleted: progress.PercentComplete,
Peaks: peaks,
}
stream.Send(&progressPb)
}
return nil return nil
} }
func Start(options Options) error { func Start(options Options) error {
grpcServer := grpc.NewServer() grpcServer := grpc.NewServer()
pbMediaSet.RegisterMediaSetServiceServer(grpcServer, &mediaSetServiceController{}) fetchMediaSetService := media.NewFetchMediaSetService(options.YoutubeClient, options.S3Client)
pbMediaSet.RegisterFetchServiceServer(grpcServer, &fetchMediaSetServiceController{fetchMediaSetService: fetchMediaSetService})
grpclog.SetLogger(log.New(os.Stdout, "server: ", log.LstdFlags)) grpclog.SetLogger(log.New(os.Stdout, "server: ", log.LstdFlags))
// TODO: implement CORS // TODO: proper CORS support
grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true })) grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true }))
handler := func(w http.ResponseWriter, r *http.Request) { handler := func(w http.ResponseWriter, r *http.Request) {
grpcWebServer.ServeHTTP(w, r) grpcWebServer.ServeHTTP(w, r)

View File

@ -1,10 +1,13 @@
import { grpc } from '@improbable-eng/grpc-web'; import { grpc } from '@improbable-eng/grpc-web';
import { MediaSetService } from './generated/media_set_pb_service';
import { import {
MediaSet as MediaSetPb, MediaSet as MediaSetPb,
GetMediaSetRequest, FetchRequest,
FetchAudioRequest,
FetchAudioProgress,
} from './generated/media_set_pb'; } from './generated/media_set_pb';
import { FetchMediaSet, FetchMediaSetAudio } from './GrpcWrapper';
import { useState, useEffect } from 'react'; import { useState, useEffect } from 'react';
import { VideoPreview } from './VideoPreview'; import { VideoPreview } from './VideoPreview';
import { Overview } from './Overview'; import { Overview } from './Overview';
@ -13,6 +16,8 @@ import { ControlBar } from './ControlBar';
import { SeekBar } from './SeekBar'; import { SeekBar } from './SeekBar';
import './App.css'; import './App.css';
const grpcHost = 'http://localhost:8888';
// Audio corresponds to media.Audio. // Audio corresponds to media.Audio.
export interface Audio { export interface Audio {
bytes: number; bytes: number;
@ -60,31 +65,20 @@ function App(): JSX.Element {
// fetch mediaset on page load: // fetch mediaset on page load:
useEffect(() => { useEffect(() => {
(async function () { (async function () {
const request = new GetMediaSetRequest(); const request = new FetchRequest();
request.setId(videoID); request.setId(videoID);
request.setSource('youtube');
grpc.invoke(MediaSetService.GetMediaSet, { const mediaSet = await FetchMediaSet(grpcHost, request);
request: request, console.log('got media set:', mediaSet);
host: 'http://localhost:8888',
onMessage: (mediaSet: MediaSetPb) => { const handleProgress = (progress: FetchAudioProgress) => {
console.log('rcvd media set: ', mediaSet.toObject()); console.log('got progress', progress);
}, };
onEnd: (
code: grpc.Code, const audioRequest = new FetchAudioRequest();
msg: string | undefined, audioRequest.setId(videoID);
trailers: grpc.Metadata audioRequest.setNumBins(1000);
) => { FetchMediaSetAudio(grpcHost, audioRequest, handleProgress);
console.log(
'finished, got code',
code,
'msg',
msg,
'trailers',
trailers
);
},
});
// console.log('fetching media...'); // console.log('fetching media...');
// const resp = await fetch( // const resp = await fetch(

View File

@ -0,0 +1,55 @@
import { grpc } from '@improbable-eng/grpc-web';
import { FetchService } from './generated/media_set_pb_service';
import {
MediaSet,
FetchRequest,
FetchAudioProgress,
FetchAudioRequest,
} from './generated/media_set_pb';
export const FetchMediaSet = (
host: string,
request: FetchRequest
): Promise<MediaSet> => {
return new Promise<MediaSet>((resolve, reject) => {
let result: MediaSet;
grpc.invoke(FetchService.Fetch, {
host: host,
request: request,
onMessage: (mediaSet: MediaSet) => {
result = mediaSet;
},
onEnd: (
code: grpc.Code,
msg: string | undefined,
_trailers: grpc.Metadata
) => {
if (code != 0) {
reject(new Error(`unexpected grpc code: ${code}, message: ${msg}`));
return;
}
resolve(result);
},
});
});
};
export const FetchMediaSetAudio = (
host: string,
request: FetchAudioRequest,
onProgress: { (progress: FetchAudioProgress): void }
) => {
grpc.invoke(FetchService.FetchAudio, {
host: 'http://localhost:8888',
request: request,
onMessage: onProgress,
onEnd: (
code: grpc.Code,
msg: string | undefined,
trailers: grpc.Metadata
) => {
console.log('fetch audio request ended');
},
});
};

View File

@ -27,22 +27,21 @@ message MediaSet {
bool loaded = 4; bool loaded = 4;
}; };
message PeaksProgress { message FetchAudioProgress {
float percent_completed = 2; float percent_completed = 2;
repeated int32 peaks = 1; repeated int32 peaks = 1;
} }
message GetMediaSetRequest { message FetchRequest {
string id = 1; string id = 1;
string source = 2;
} }
message GetPeaksRequest { message FetchAudioRequest {
string id = 1; string id = 1;
int32 num_bins = 2; int32 num_bins = 2;
} }
service MediaSetService { service FetchService {
rpc GetMediaSet(GetMediaSetRequest) returns (MediaSet) {} rpc Fetch(FetchRequest) returns (MediaSet) {}
rpc GetPeaks(GetPeaksRequest) returns (stream PeaksProgress) {} rpc FetchAudio(FetchAudioRequest) returns (stream FetchAudioProgress) {}
} }