clipper/backend/media/fetch.go

232 lines
5.9 KiB
Go

package media
import (
"context"
"errors"
"fmt"
"io"
"log"
"strconv"
"time"
"github.com/aws/aws-sdk-go-v2/service/s3"
youtubev2 "github.com/kkdai/youtube/v2"
)
const s3Bucket = "clipper-development"
const (
rawAudioCodec = "pcm_s16le"
rawAudioFormat = "s16le"
rawAudioSampleRate = 48_000
)
// progressReader is a reader that prints progress logs as it reads.
type progressReader struct {
io.Reader
label string
total, exp int
}
func (pw *progressReader) Read(p []byte) (int, error) {
n, err := pw.Reader.Read(p)
pw.total += n
log.Printf("[ProgressReader] [%s] Read %d of %d (%.02f%%) bytes from the provided reader", pw.label, pw.total, pw.exp, (float32(pw.total)/float32(pw.exp))*100.0)
return n, err
}
// S3Client wraps the AWS S3 service client.
type S3Client interface {
CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error)
UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error)
AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)
CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error)
}
// YoutubeClient wraps the youtube.Client client.
type YoutubeClient interface {
GetVideoContext(context.Context, string) (*youtubev2.Video, error)
GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error)
}
// FetchMediaSetService fetches a video via an io.Reader.
type FetchMediaSetService struct {
youtube YoutubeClient
s3 S3Client
}
func NewFetchMediaSetService(youtubeClient YoutubeClient, s3Client S3Client) *FetchMediaSetService {
return &FetchMediaSetService{
youtube: youtubeClient,
s3: s3Client,
}
}
// Fetch fetches the metadata for a given MediaSet source.
func (s *FetchMediaSetService) Fetch(ctx context.Context, id string) (*MediaSet, error) {
video, err := s.youtube.GetVideoContext(ctx, id)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
if len(video.Formats) == 0 {
return nil, errors.New("no format available")
}
// just the audio for now
// grab an audio stream from youtube
// TODO: avoid possible panic
format := SortYoutubeAudio(video.Formats)[0]
sampleRate, err := strconv.Atoi(format.AudioSampleRate)
if err != nil {
return nil, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate)
}
approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs)
if err != nil {
return nil, fmt.Errorf("could not parse audio duration: %s", err)
}
approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond
approxFrames := int64(approxDuration/time.Second) * int64(sampleRate)
mediaSet := MediaSet{
ID: id,
Audio: Audio{
// we need to decode it to be able to know bytes and frames exactly
ApproxFrames: approxFrames,
Channels: format.AudioChannels,
SampleRate: sampleRate,
},
}
// TODO: video
// TODO: save to JSON
return &mediaSet, nil
}
// FetchAudio fetches the audio part of a MediaSet.
func (s *FetchMediaSetService) FetchAudio(ctx context.Context, id string) (FetchAudioProgressReader, error) {
mediaSet := NewMediaSet(id)
if !mediaSet.Exists() {
// TODO check if audio uploaded already, don't bother again
return nil, errors.New("no media set found")
}
if err := mediaSet.Load(); err != nil {
return nil, fmt.Errorf("error loading media set: %v", err)
}
video, err := s.youtube.GetVideoContext(ctx, id)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
if len(video.Formats) == 0 {
return nil, errors.New("no format available")
}
// TODO: avoid possible panic
format := SortYoutubeAudio(video.Formats)[0]
stream, _, err := s.youtube.GetStreamContext(ctx, video, &format)
if err != nil {
return nil, fmt.Errorf("error fetching stream: %v", err)
}
// wrap it in a progress reader
progressStream := &progressReader{Reader: stream, label: "audio", exp: int(format.ContentLength)}
ffmpegReader, err := newFfmpegReader(ctx, progressStream, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-")
if err != nil {
return nil, fmt.Errorf("error creating ffmpegreader: %v", err)
}
// set up uploader, this is writer 1
uploader, err := newMultipartUploadWriter(
ctx,
s.s3,
s3Bucket,
fmt.Sprintf("media_sets/%s/audio.webm", id),
"application/octet-stream",
)
if err != nil {
return nil, fmt.Errorf("error creating uploader: %v", err)
}
fetchAudioProgressReader := newFetchAudioProgressReader(
mediaSet.Audio.ApproxFrames,
format.AudioChannels,
100,
)
state := fetchAudioState{
fetchAudioProgressReader: fetchAudioProgressReader,
ffmpegReader: ffmpegReader,
uploader: uploader,
}
go state.run(ctx)
return &state, nil
}
type fetchAudioState struct {
*fetchAudioProgressReader
ffmpegReader io.ReadCloser
uploader *multipartUploadWriter
}
func (s *fetchAudioState) run(ctx context.Context) {
mw := io.MultiWriter(s, s.uploader)
done := make(chan error)
var err error
go func() {
_, copyErr := io.Copy(mw, s.ffmpegReader)
done <- copyErr
}()
outer:
for {
select {
case <-ctx.Done():
err = ctx.Err()
break outer
case err = <-done:
break outer
}
}
if readerErr := s.ffmpegReader.Close(); readerErr != nil {
if err == nil {
err = readerErr
}
}
if err == nil {
if uploaderErr := s.uploader.Complete(); uploaderErr != nil {
err = uploaderErr
}
}
if err != nil {
newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
if abortUploadErr := s.uploader.Abort(newCtx); abortUploadErr != nil {
log.Printf("error aborting uploader: %v", abortUploadErr)
}
s.Abort(err)
return
}
if iterErr := s.Close(); iterErr != nil {
log.Printf("error closing peak iterator: %v", iterErr)
}
}