2021-10-22 19:30:09 +00:00
package media
2021-10-27 19:34:59 +00:00
import (
2021-11-04 06:13:00 +00:00
"bytes"
2021-10-27 19:34:59 +00:00
"context"
2021-11-01 05:28:40 +00:00
"database/sql"
2021-11-16 06:48:30 +00:00
"encoding/binary"
2021-10-27 19:34:59 +00:00
"errors"
"fmt"
2021-11-01 05:28:40 +00:00
"io"
2021-11-21 19:43:40 +00:00
"net/http"
2021-11-01 05:28:40 +00:00
"strconv"
"time"
2021-11-22 18:26:51 +00:00
"git.netflux.io/rob/clipper/config"
2021-11-01 05:28:40 +00:00
"git.netflux.io/rob/clipper/generated/store"
2021-11-02 18:03:26 +00:00
"github.com/aws/aws-sdk-go-v2/aws"
2021-11-20 18:29:34 +00:00
signerv4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
2021-11-01 05:28:40 +00:00
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/uuid"
2021-11-25 19:35:51 +00:00
"github.com/jackc/pgx/v4"
2021-11-01 05:28:40 +00:00
youtubev2 "github.com/kkdai/youtube/v2"
2021-11-16 06:48:30 +00:00
"go.uber.org/zap"
2021-11-01 05:28:40 +00:00
)
2021-11-20 18:29:34 +00:00
const (
getVideoExpiresIn = time . Hour * 1
)
2021-11-01 05:28:40 +00:00
const (
rawAudioCodec = "pcm_s16le"
rawAudioFormat = "s16le"
rawAudioSampleRate = 48_000
2021-11-02 16:20:47 +00:00
rawAudioMimeType = "audio/raw"
2021-11-01 05:28:40 +00:00
)
const (
thumbnailWidth = 177 // 16:9
thumbnailHeight = 100 // "
2021-10-27 19:34:59 +00:00
)
2021-10-22 19:30:09 +00:00
2021-11-01 05:28:40 +00:00
// progressReader is a reader that prints progress logs as it reads.
type progressReader struct {
io . Reader
2021-11-22 20:35:51 +00:00
2021-11-01 05:28:40 +00:00
label string
2021-11-20 18:29:34 +00:00
total , exp int64
2021-11-22 20:35:51 +00:00
logger * zap . SugaredLogger
}
func newProgressReader ( reader io . Reader , label string , exp int64 , logger * zap . SugaredLogger ) * progressReader {
return & progressReader {
Reader : reader ,
exp : exp ,
logger : logger . Named ( fmt . Sprintf ( "ProgressReader %s" , label ) ) ,
}
2021-11-01 05:28:40 +00:00
}
2021-11-22 20:35:51 +00:00
func ( r * progressReader ) Read ( p [ ] byte ) ( int , error ) {
n , err := r . Reader . Read ( p )
r . total += int64 ( n )
2021-11-01 05:28:40 +00:00
2021-11-22 20:35:51 +00:00
r . logger . Debugf ( "Read %d of %d (%.02f%%) bytes from the provided reader" , r . total , r . exp , ( float32 ( r . total ) / float32 ( r . exp ) ) * 100.0 )
2021-11-01 05:28:40 +00:00
return n , err
}
// Store wraps a database store.
type Store interface {
2021-11-21 19:43:40 +00:00
GetMediaSet ( context . Context , uuid . UUID ) ( store . MediaSet , error )
GetMediaSetByYoutubeID ( context . Context , string ) ( store . MediaSet , error )
CreateMediaSet ( context . Context , store . CreateMediaSetParams ) ( store . MediaSet , error )
SetAudioUploaded ( context . Context , store . SetAudioUploadedParams ) ( store . MediaSet , error )
SetVideoUploaded ( context . Context , store . SetVideoUploadedParams ) ( store . MediaSet , error )
SetVideoThumbnailUploaded ( context . Context , store . SetVideoThumbnailUploadedParams ) ( store . MediaSet , error )
2021-11-20 18:29:34 +00:00
}
// S3API provides an API to AWS S3.
type S3API struct {
S3Client
S3PresignClient
2021-11-01 05:28:40 +00:00
}
// S3Client wraps the AWS S3 service client.
type S3Client interface {
2021-11-02 18:03:26 +00:00
GetObject ( context . Context , * s3 . GetObjectInput , ... func ( * s3 . Options ) ) ( * s3 . GetObjectOutput , error )
2021-11-01 05:28:40 +00:00
CreateMultipartUpload ( context . Context , * s3 . CreateMultipartUploadInput , ... func ( * s3 . Options ) ) ( * s3 . CreateMultipartUploadOutput , error )
UploadPart ( context . Context , * s3 . UploadPartInput , ... func ( * s3 . Options ) ) ( * s3 . UploadPartOutput , error )
2021-11-20 18:29:34 +00:00
AbortMultipartUpload ( context . Context , * s3 . AbortMultipartUploadInput , ... func ( * s3 . Options ) ) ( * s3 . AbortMultipartUploadOutput , error )
2021-11-01 05:28:40 +00:00
CompleteMultipartUpload ( context . Context , * s3 . CompleteMultipartUploadInput , ... func ( * s3 . Options ) ) ( * s3 . CompleteMultipartUploadOutput , error )
}
2021-11-20 18:29:34 +00:00
// S3PresignClient wraps the AWS S3 Presign client.
type S3PresignClient interface {
PresignGetObject ( context . Context , * s3 . GetObjectInput , ... func ( * s3 . PresignOptions ) ) ( * signerv4 . PresignedHTTPRequest , error )
}
2021-11-01 05:28:40 +00:00
// YoutubeClient wraps the youtube.Client client.
type YoutubeClient interface {
GetVideoContext ( context . Context , string ) ( * youtubev2 . Video , error )
GetStreamContext ( context . Context , * youtubev2 . Video , * youtubev2 . Format ) ( io . ReadCloser , int64 , error )
}
// MediaSetService exposes logical flows handling MediaSets.
type MediaSetService struct {
store Store
youtube YoutubeClient
2021-11-20 18:29:34 +00:00
s3 S3API
2021-11-22 18:26:51 +00:00
config config . Config
2021-11-16 06:48:30 +00:00
logger * zap . SugaredLogger
2021-11-01 05:28:40 +00:00
}
2021-11-22 18:26:51 +00:00
func NewMediaSetService ( store Store , youtubeClient YoutubeClient , s3API S3API , config config . Config , logger * zap . Logger ) * MediaSetService {
2021-11-01 05:28:40 +00:00
return & MediaSetService {
store : store ,
youtube : youtubeClient ,
2021-11-20 18:29:34 +00:00
s3 : s3API ,
2021-11-22 18:26:51 +00:00
config : config ,
2021-11-16 06:48:30 +00:00
logger : logger . Sugar ( ) ,
2021-11-01 05:28:40 +00:00
}
}
2021-11-20 18:29:34 +00:00
// Get fetches the metadata for a given MediaSet source. If it does not exist
// in the local DB, it will attempt to create it.
2021-11-01 05:28:40 +00:00
func ( s * MediaSetService ) Get ( ctx context . Context , youtubeID string ) ( * MediaSet , error ) {
var (
mediaSet * MediaSet
err error
)
mediaSet , err = s . findMediaSet ( ctx , youtubeID )
if err != nil {
return nil , fmt . Errorf ( "error getting existing media set: %v" , err )
}
if mediaSet == nil {
mediaSet , err = s . createMediaSet ( ctx , youtubeID )
if err != nil {
return nil , fmt . Errorf ( "error getting new media set: %v" , err )
}
}
return mediaSet , nil
}
func ( s * MediaSetService ) createMediaSet ( ctx context . Context , youtubeID string ) ( * MediaSet , error ) {
video , err := s . youtube . GetVideoContext ( ctx , youtubeID )
if err != nil {
return nil , fmt . Errorf ( "error fetching video: %v" , err )
}
if len ( video . Formats ) == 0 {
return nil , errors . New ( "no format available" )
}
audioMetadata , err := s . fetchAudioMetadata ( ctx , video )
if err != nil {
return nil , fmt . Errorf ( "error fetching audio metadata: %v" , err )
}
videoMetadata , err := s . fetchVideoMetadata ( ctx , video )
if err != nil {
return nil , fmt . Errorf ( "error fetching video metadata: %v" , err )
}
2021-11-20 18:29:34 +00:00
storeParams := store . CreateMediaSetParams {
2021-11-01 05:28:40 +00:00
YoutubeID : youtubeID ,
AudioYoutubeItag : int32 ( audioMetadata . YoutubeItag ) ,
AudioChannels : int32 ( audioMetadata . Channels ) ,
AudioFramesApprox : audioMetadata . ApproxFrames ,
2021-11-02 16:20:47 +00:00
AudioSampleRate : int32 ( audioMetadata . SampleRate ) ,
2021-11-01 05:28:40 +00:00
AudioMimeTypeEncoded : audioMetadata . MimeType ,
VideoYoutubeItag : int32 ( videoMetadata . YoutubeItag ) ,
VideoMimeType : videoMetadata . MimeType ,
VideoDurationNanos : videoMetadata . Duration . Nanoseconds ( ) ,
}
2021-11-20 18:29:34 +00:00
mediaSet , err := s . store . CreateMediaSet ( ctx , storeParams )
2021-11-01 05:28:40 +00:00
if err != nil {
return nil , fmt . Errorf ( "error creating media set in store: %v" , err )
}
return & MediaSet {
2021-11-02 18:03:26 +00:00
ID : mediaSet . ID ,
2021-11-01 05:28:40 +00:00
YoutubeID : youtubeID ,
Audio : audioMetadata ,
Video : videoMetadata ,
} , nil
}
// findMediaSet fetches a record from the database, returning (nil, nil) if it does not exist.
func ( s * MediaSetService ) findMediaSet ( ctx context . Context , youtubeID string ) ( * MediaSet , error ) {
mediaSet , err := s . store . GetMediaSetByYoutubeID ( ctx , youtubeID )
if err != nil {
2021-11-25 19:35:51 +00:00
if err == pgx . ErrNoRows {
2021-11-01 05:28:40 +00:00
return nil , nil
}
2021-11-25 19:35:51 +00:00
return nil , fmt . Errorf ( "error getting media set: %v" , err )
2021-11-01 05:28:40 +00:00
}
return & MediaSet {
2021-11-02 18:03:26 +00:00
ID : mediaSet . ID ,
YoutubeID : mediaSet . YoutubeID ,
2021-11-01 05:28:40 +00:00
Audio : Audio {
YoutubeItag : int ( mediaSet . AudioYoutubeItag ) ,
Bytes : 0 , // DEPRECATED
Channels : int ( mediaSet . AudioChannels ) ,
ApproxFrames : int64 ( mediaSet . AudioFramesApprox ) ,
2021-11-02 16:20:47 +00:00
Frames : mediaSet . AudioFrames . Int64 ,
SampleRate : int ( mediaSet . AudioSampleRate ) ,
2021-11-01 19:33:45 +00:00
MimeType : mediaSet . AudioMimeTypeEncoded ,
2021-11-01 05:28:40 +00:00
} ,
Video : Video {
YoutubeItag : int ( mediaSet . VideoYoutubeItag ) ,
Bytes : 0 , // DEPRECATED?
Duration : time . Duration ( mediaSet . VideoDurationNanos ) ,
2021-11-01 19:33:45 +00:00
MimeType : mediaSet . VideoMimeType ,
2021-11-01 05:28:40 +00:00
ThumbnailWidth : 0 , // ??
ThumbnailHeight : 0 , // ??
} ,
} , nil
}
func ( s * MediaSetService ) fetchVideoMetadata ( ctx context . Context , video * youtubev2 . Video ) ( Video , error ) {
formats := FilterYoutubeVideo ( video . Formats )
if len ( video . Formats ) == 0 {
return Video { } , errors . New ( "no format available" )
}
format := formats [ 0 ]
durationMsecs , err := strconv . Atoi ( format . ApproxDurationMs )
if err != nil {
return Video { } , fmt . Errorf ( "could not parse video duration: %s" , err )
}
return Video {
YoutubeItag : format . ItagNo ,
MimeType : format . MimeType ,
Bytes : format . ContentLength ,
ThumbnailWidth : thumbnailWidth ,
ThumbnailHeight : thumbnailHeight ,
Duration : time . Duration ( durationMsecs ) * time . Millisecond ,
} , nil
}
func ( s * MediaSetService ) fetchAudioMetadata ( ctx context . Context , video * youtubev2 . Video ) ( Audio , error ) {
formats := FilterYoutubeAudio ( video . Formats )
if len ( video . Formats ) == 0 {
return Audio { } , errors . New ( "no format available" )
}
format := formats [ 0 ]
sampleRate , err := strconv . Atoi ( format . AudioSampleRate )
if err != nil {
return Audio { } , fmt . Errorf ( "invalid samplerate: %s" , format . AudioSampleRate )
}
approxDurationMsecs , err := strconv . Atoi ( format . ApproxDurationMs )
if err != nil {
return Audio { } , fmt . Errorf ( "could not parse audio duration: %s" , err )
}
approxDuration := time . Duration ( approxDurationMsecs ) * time . Millisecond
approxFrames := int64 ( approxDuration / time . Second ) * int64 ( sampleRate )
return Audio {
MimeType : format . MimeType ,
YoutubeItag : format . ItagNo ,
ApproxFrames : approxFrames ,
Channels : format . AudioChannels ,
SampleRate : sampleRate ,
} , nil
}
2021-11-20 18:29:34 +00:00
// GetVideo fetches the video part of a MediaSet.
func ( s * MediaSetService ) GetVideo ( ctx context . Context , id uuid . UUID ) ( GetVideoProgressReader , error ) {
mediaSet , err := s . store . GetMediaSet ( ctx , id )
if err != nil {
return nil , fmt . Errorf ( "error getting media set: %v" , err )
}
// TODO: use mediaSet func to fetch s3Key
s3Key := fmt . Sprintf ( "media_sets/%s/video.mp4" , mediaSet . ID )
if mediaSet . VideoS3UploadedAt . Valid {
2021-11-22 18:26:51 +00:00
input := s3 . GetObjectInput { Bucket : aws . String ( s . config . S3Bucket ) , Key : aws . String ( s3Key ) }
2021-11-20 18:29:34 +00:00
request , signErr := s . s3 . PresignGetObject ( ctx , & input , s3 . WithPresignExpires ( getVideoExpiresIn ) )
if signErr != nil {
return nil , fmt . Errorf ( "error generating presigned URL: %v" , signErr )
}
videoGetter := videoGetterDownloaded ( request . URL )
return & videoGetter , nil
}
video , err := s . youtube . GetVideoContext ( ctx , mediaSet . YoutubeID )
if err != nil {
return nil , fmt . Errorf ( "error fetching video: %v" , err )
}
format := video . Formats . FindByItag ( int ( mediaSet . VideoYoutubeItag ) )
if format == nil {
return nil , fmt . Errorf ( "error finding itag: %v" , err )
}
stream , _ , err := s . youtube . GetStreamContext ( ctx , video , format )
if err != nil {
return nil , fmt . Errorf ( "error fetching stream: %v" , err )
}
videoGetter := newVideoGetter ( s . s3 , s . store , s . logger )
return videoGetter . GetVideo (
ctx ,
stream ,
format . ContentLength ,
mediaSet . ID ,
2021-11-22 18:26:51 +00:00
s . config . S3Bucket ,
2021-11-20 18:29:34 +00:00
s3Key ,
format . MimeType ,
)
}
type GetVideoProgressReader interface {
// Next returns the next video progress status. When the stream has finished,
// a valid GetVideoProgress value will be returned with io.EOF.
Next ( ) ( GetVideoProgress , error )
}
2021-11-01 05:28:40 +00:00
// GetAudio fetches the audio part of a MediaSet.
func ( s * MediaSetService ) GetAudio ( ctx context . Context , id uuid . UUID , numBins int ) ( GetAudioProgressReader , error ) {
mediaSet , err := s . store . GetMediaSet ( ctx , id )
if err != nil {
return nil , fmt . Errorf ( "error getting media set: %v" , err )
}
2021-11-02 18:03:26 +00:00
if mediaSet . AudioS3UploadedAt . Valid {
return s . getAudioFromS3 ( ctx , mediaSet , numBins )
}
return s . getAudioFromYoutube ( ctx , mediaSet , numBins )
}
func ( s * MediaSetService ) getAudioFromS3 ( ctx context . Context , mediaSet store . MediaSet , numBins int ) ( GetAudioProgressReader , error ) {
input := s3 . GetObjectInput {
Bucket : aws . String ( mediaSet . AudioS3Bucket . String ) ,
Key : aws . String ( mediaSet . AudioS3Key . String ) ,
}
output , err := s . s3 . GetObject ( ctx , & input )
if err != nil {
return nil , fmt . Errorf ( "error getting object from s3: %v" , err )
}
2021-11-20 18:29:34 +00:00
getAudioProgressReader , err := newGetAudioProgressReader (
2021-11-02 18:03:26 +00:00
int64 ( mediaSet . AudioFrames . Int64 ) ,
int ( mediaSet . AudioChannels ) ,
numBins ,
)
2021-11-20 18:29:34 +00:00
if err != nil {
return nil , fmt . Errorf ( "error creating audio reader: %v" , err )
}
2021-11-02 18:03:26 +00:00
state := getAudioFromS3State {
2021-11-12 12:36:26 +00:00
getAudioProgressReader : getAudioProgressReader ,
s3Reader : NewModuloBufReader ( output . Body , int ( mediaSet . AudioChannels ) * SizeOfInt16 ) ,
2021-11-22 20:35:51 +00:00
logger : s . logger ,
2021-11-02 18:03:26 +00:00
}
go state . run ( ctx )
return & state , nil
}
type getAudioFromS3State struct {
2021-11-12 12:36:26 +00:00
* getAudioProgressReader
2021-11-02 18:03:26 +00:00
s3Reader io . ReadCloser
2021-11-22 20:35:51 +00:00
logger * zap . SugaredLogger
2021-11-02 18:03:26 +00:00
}
func ( s * getAudioFromS3State ) run ( ctx context . Context ) {
done := make ( chan error )
var err error
go func ( ) {
_ , copyErr := io . Copy ( s , s . s3Reader )
done <- copyErr
} ( )
outer :
for {
select {
case <- ctx . Done ( ) :
err = ctx . Err ( )
break outer
case err = <- done :
break outer
}
}
if readerErr := s . s3Reader . Close ( ) ; readerErr != nil {
if err == nil {
err = readerErr
}
}
if err != nil {
2021-11-22 20:35:51 +00:00
s . logger . Errorf ( "getAudioFromS3State: error closing s3 reader: %v" , err )
2021-11-02 18:03:26 +00:00
s . Abort ( err )
return
}
if iterErr := s . Close ( ) ; iterErr != nil {
2021-11-22 20:35:51 +00:00
s . logger . Errorf ( "getAudioFromS3State: error closing progress iterator: %v" , iterErr )
2021-11-02 18:03:26 +00:00
}
}
func ( s * MediaSetService ) getAudioFromYoutube ( ctx context . Context , mediaSet store . MediaSet , numBins int ) ( GetAudioProgressReader , error ) {
2021-11-01 05:28:40 +00:00
video , err := s . youtube . GetVideoContext ( ctx , mediaSet . YoutubeID )
if err != nil {
return nil , fmt . Errorf ( "error fetching video: %v" , err )
}
format := video . Formats . FindByItag ( int ( mediaSet . AudioYoutubeItag ) )
if format == nil {
return nil , fmt . Errorf ( "error finding itag: %v" , err )
}
stream , _ , err := s . youtube . GetStreamContext ( ctx , video , format )
if err != nil {
return nil , fmt . Errorf ( "error fetching stream: %v" , err )
}
2021-10-22 19:30:09 +00:00
2021-11-22 20:35:51 +00:00
streamWithProgress := newProgressReader ( stream , "audio" , format . ContentLength , s . logger )
2021-10-27 19:34:59 +00:00
2021-11-22 20:35:51 +00:00
ffmpegReader , err := newFfmpegReader ( ctx , streamWithProgress , "-hide_banner" , "-loglevel" , "error" , "-i" , "-" , "-f" , rawAudioFormat , "-ar" , strconv . Itoa ( rawAudioSampleRate ) , "-acodec" , rawAudioCodec , "-" )
2021-11-01 05:28:40 +00:00
if err != nil {
return nil , fmt . Errorf ( "error creating ffmpegreader: %v" , err )
2021-10-27 19:34:59 +00:00
}
2021-11-20 18:29:34 +00:00
// TODO: use mediaSet func to fetch s3Key
2021-11-06 20:52:47 +00:00
s3Key := fmt . Sprintf ( "media_sets/%s/audio.raw" , mediaSet . ID )
2021-11-22 20:35:51 +00:00
uploader := newMultipartUploader ( s . s3 , s . logger )
2021-11-01 05:28:40 +00:00
2021-11-20 18:29:34 +00:00
getAudioProgressReader , err := newGetAudioProgressReader (
2021-11-01 05:28:40 +00:00
int64 ( mediaSet . AudioFramesApprox ) ,
format . AudioChannels ,
2021-11-02 18:03:26 +00:00
numBins ,
2021-11-01 05:28:40 +00:00
)
2021-11-20 18:29:34 +00:00
if err != nil {
return nil , fmt . Errorf ( "error creating audio reader: %v" , err )
}
2021-11-01 05:28:40 +00:00
2021-11-02 18:03:26 +00:00
state := getAudioFromYoutubeState {
2021-11-12 12:36:26 +00:00
getAudioProgressReader : getAudioProgressReader ,
ffmpegReader : ffmpegReader ,
uploader : uploader ,
2021-11-22 18:26:51 +00:00
s3Bucket : s . config . S3Bucket ,
2021-11-12 12:36:26 +00:00
s3Key : s3Key ,
store : s . store ,
channels : format . AudioChannels ,
2021-11-22 20:35:51 +00:00
logger : s . logger ,
2021-11-01 05:28:40 +00:00
}
2021-11-04 06:13:00 +00:00
go state . run ( ctx , mediaSet . ID )
2021-11-01 05:28:40 +00:00
return & state , nil
}
2021-11-02 18:03:26 +00:00
type getAudioFromYoutubeState struct {
2021-11-12 12:36:26 +00:00
* getAudioProgressReader
2021-11-01 05:28:40 +00:00
2021-11-08 13:56:25 +00:00
ffmpegReader * ffmpegReader
2021-11-12 07:20:34 +00:00
uploader * multipartUploader
2021-11-02 16:20:47 +00:00
s3Bucket , s3Key string
store Store
2021-11-12 12:36:26 +00:00
channels int
2021-11-22 20:35:51 +00:00
logger * zap . SugaredLogger
2021-11-01 05:28:40 +00:00
}
2021-11-04 06:13:00 +00:00
func ( s * getAudioFromYoutubeState ) run ( ctx context . Context , mediaSetID uuid . UUID ) {
2021-11-12 07:20:34 +00:00
teeReader := io . TeeReader ( s . ffmpegReader , s )
bytesUploaded , err := s . uploader . Upload ( ctx , teeReader , s . s3Bucket , s . s3Key , rawAudioMimeType )
2021-11-10 12:37:25 +00:00
// If there was an error returned, the underlying ffmpegReader process may
// still be active. Kill it.
2021-11-08 13:56:25 +00:00
if err != nil {
if cancelErr := s . ffmpegReader . Cancel ( ) ; cancelErr != nil {
2021-11-22 20:35:51 +00:00
s . logger . Errorf ( "getAudioFromYoutubeState: error cancelling ffmpegreader: %v" , cancelErr )
2021-11-08 13:56:25 +00:00
}
}
2021-11-12 07:20:34 +00:00
// Either way, we need to wait for the ffmpegReader process to exit,
// and ensure there is no error.
2021-11-01 05:28:40 +00:00
if readerErr := s . ffmpegReader . Close ( ) ; readerErr != nil {
if err == nil {
err = readerErr
}
}
2021-11-02 16:20:47 +00:00
if err == nil {
_ , updateErr := s . store . SetAudioUploaded ( ctx , store . SetAudioUploadedParams {
2021-11-04 06:13:00 +00:00
ID : mediaSetID ,
2021-11-02 16:20:47 +00:00
AudioS3Bucket : sqlString ( s . s3Bucket ) ,
AudioS3Key : sqlString ( s . s3Key ) ,
2021-11-12 12:36:26 +00:00
AudioFrames : sqlInt64 ( bytesUploaded / SizeOfInt16 / int64 ( s . channels ) ) ,
2021-11-02 16:20:47 +00:00
} )
if updateErr != nil {
err = updateErr
}
}
2021-11-01 05:28:40 +00:00
if err != nil {
2021-11-22 20:35:51 +00:00
s . logger . Errorf ( "getAudioFromYoutubeState: error uploading asynchronously: %v" , err )
2021-11-08 01:54:43 +00:00
2021-11-01 05:28:40 +00:00
s . Abort ( err )
return
}
if iterErr := s . Close ( ) ; iterErr != nil {
2021-11-22 20:35:51 +00:00
s . logger . Errorf ( "getAudioFromYoutubeState: error closing progress iterator: %v" , iterErr )
2021-11-01 05:28:40 +00:00
}
2021-10-22 19:30:09 +00:00
}
2021-11-02 16:20:47 +00:00
2021-11-17 17:53:27 +00:00
func ( s * MediaSetService ) GetAudioSegment ( ctx context . Context , id uuid . UUID , startFrame , endFrame int64 , numBins int ) ( [ ] int16 , error ) {
2021-11-16 06:48:30 +00:00
mediaSet , err := s . store . GetMediaSet ( ctx , id )
if err != nil {
return nil , fmt . Errorf ( "error getting media set: %v" , err )
}
byteRange := fmt . Sprintf (
"bytes=%d-%d" ,
startFrame * int64 ( mediaSet . AudioChannels ) * SizeOfInt16 ,
endFrame * int64 ( mediaSet . AudioChannels ) * SizeOfInt16 ,
)
input := s3 . GetObjectInput {
Bucket : aws . String ( mediaSet . AudioS3Bucket . String ) ,
Key : aws . String ( mediaSet . AudioS3Key . String ) ,
Range : aws . String ( byteRange ) ,
}
output , err := s . s3 . GetObject ( ctx , & input )
if err != nil {
return nil , fmt . Errorf ( "error getting object from s3: %v" , err )
}
defer output . Body . Close ( )
2021-11-17 17:53:27 +00:00
const readBufSizeBytes = 8_192
2021-11-16 06:48:30 +00:00
channels := int ( mediaSet . AudioChannels )
2021-11-17 17:53:27 +00:00
modReader := NewModuloBufReader ( output . Body , channels * SizeOfInt16 )
readBuf := make ( [ ] byte , readBufSizeBytes )
peaks := make ( [ ] int16 , channels * numBins )
2021-11-16 06:48:30 +00:00
totalFrames := endFrame - startFrame
framesPerBin := totalFrames / int64 ( numBins )
2021-11-17 17:53:27 +00:00
sampleBuf := make ( [ ] int16 , readBufSizeBytes / SizeOfInt16 )
bytesExpected := ( endFrame - startFrame ) * int64 ( channels ) * SizeOfInt16
2021-11-16 06:48:30 +00:00
2021-11-17 17:53:27 +00:00
var (
bytesRead int64
closing bool
currPeakIndex int
currFrame int64
)
2021-11-16 06:48:30 +00:00
for {
2021-11-17 17:53:27 +00:00
n , err := modReader . Read ( readBuf )
if err == io . EOF {
closing = true
} else if err != nil {
2021-11-16 06:48:30 +00:00
return nil , fmt . Errorf ( "read error: %v" , err )
}
2021-11-17 17:53:27 +00:00
bytesRead += int64 ( n )
samples := sampleBuf [ : n / SizeOfInt16 ]
if err := binary . Read ( bytes . NewReader ( readBuf [ : n ] ) , binary . LittleEndian , samples ) ; err != nil {
2021-11-16 06:48:30 +00:00
return nil , fmt . Errorf ( "error interpreting samples: %v" , err )
}
for i := 0 ; i < len ( samples ) ; i += channels {
for j := 0 ; j < channels ; j ++ {
2021-11-17 17:53:27 +00:00
samp := sampleBuf [ i + j ]
2021-11-16 06:48:30 +00:00
if samp < 0 {
samp = - samp
}
2021-11-17 17:53:27 +00:00
if samp > peaks [ currPeakIndex + j ] {
peaks [ currPeakIndex + j ] = samp
2021-11-16 06:48:30 +00:00
}
}
2021-11-17 17:53:27 +00:00
2021-11-16 06:48:30 +00:00
if currFrame == framesPerBin {
currFrame = 0
2021-11-17 17:53:27 +00:00
currPeakIndex += channels
} else {
currFrame ++
2021-11-16 06:48:30 +00:00
}
}
2021-11-17 17:53:27 +00:00
if closing {
break
}
}
if bytesRead < bytesExpected {
2021-11-22 20:35:51 +00:00
s . logger . With ( "startFrame" , startFrame , "endFrame" , endFrame , "got" , bytesRead , "want" , bytesExpected , "key" , mediaSet . AudioS3Key . String ) . Warn ( "short read from S3" )
2021-11-16 06:48:30 +00:00
}
return peaks , nil
}
2021-11-02 16:20:47 +00:00
func sqlString ( s string ) sql . NullString {
return sql . NullString { String : s , Valid : true }
}
2021-11-02 18:03:26 +00:00
func sqlInt64 ( i int64 ) sql . NullInt64 {
return sql . NullInt64 { Int64 : i , Valid : true }
}
2021-11-04 06:13:00 +00:00
2021-11-21 19:43:40 +00:00
func sqlInt32 ( i int32 ) sql . NullInt32 {
return sql . NullInt32 { Int32 : i , Valid : true }
}
2021-11-16 06:48:30 +00:00
// ModuloBufReader reads from a reader in block sizes that are exactly modulo
// modSize, with any remainder buffered until the next read.
2021-11-04 06:13:00 +00:00
type ModuloBufReader struct {
io . ReadCloser
buf bytes . Buffer
modSize int
}
func NewModuloBufReader ( r io . ReadCloser , modSize int ) * ModuloBufReader {
return & ModuloBufReader { ReadCloser : r , modSize : modSize }
}
func ( r * ModuloBufReader ) Read ( p [ ] byte ) ( int , error ) {
// err is always io.EOF or nil
nr1 , _ := r . buf . Read ( p )
nr2 , err := r . ReadCloser . Read ( p [ nr1 : ] )
nr := nr1 + nr2
rem := nr % r . modSize
2021-11-17 07:22:15 +00:00
// if there was an error, return immediately.
if err == io . EOF {
return nr , err
} else if err != nil {
return nr - rem , err
}
// write any remainder to the buffer
2021-11-04 06:13:00 +00:00
if rem != 0 {
// err is always nil
2021-11-16 06:49:44 +00:00
_ , _ = r . buf . Write ( p [ nr - rem : nr ] )
2021-11-04 06:13:00 +00:00
}
return nr - rem , err
}
2021-11-21 19:43:40 +00:00
type VideoThumbnail struct {
Data [ ] byte
Width , Height int
}
func ( s * MediaSetService ) GetVideoThumbnail ( ctx context . Context , id uuid . UUID ) ( VideoThumbnail , error ) {
mediaSet , err := s . store . GetMediaSet ( ctx , id )
if err != nil {
return VideoThumbnail { } , fmt . Errorf ( "error getting media set: %v" , err )
}
if mediaSet . VideoThumbnailS3UploadedAt . Valid {
return s . getThumbnailFromS3 ( ctx , mediaSet )
}
return s . getThumbnailFromYoutube ( ctx , mediaSet )
}
func ( s * MediaSetService ) getThumbnailFromS3 ( ctx context . Context , mediaSet store . MediaSet ) ( VideoThumbnail , error ) {
input := s3 . GetObjectInput {
Bucket : aws . String ( mediaSet . VideoThumbnailS3Bucket . String ) ,
Key : aws . String ( mediaSet . VideoThumbnailS3Key . String ) ,
}
output , err := s . s3 . GetObject ( ctx , & input )
if err != nil {
return VideoThumbnail { } , fmt . Errorf ( "error fetching thumbnail from s3: %v" , err )
}
defer output . Body . Close ( )
imageData , err := io . ReadAll ( output . Body )
if err != nil {
return VideoThumbnail { } , fmt . Errorf ( "error reading thumbnail from s3: %v" , err )
}
return VideoThumbnail {
Width : int ( mediaSet . VideoThumbnailWidth . Int32 ) ,
Height : int ( mediaSet . VideoThumbnailHeight . Int32 ) ,
Data : imageData ,
} , nil
}
func ( s * MediaSetService ) getThumbnailFromYoutube ( ctx context . Context , mediaSet store . MediaSet ) ( VideoThumbnail , error ) {
video , err := s . youtube . GetVideoContext ( ctx , mediaSet . YoutubeID )
if err != nil {
return VideoThumbnail { } , fmt . Errorf ( "error fetching video: %v" , err )
}
if len ( video . Formats ) == 0 {
return VideoThumbnail { } , errors . New ( "no format available" )
}
thumbnails := video . Thumbnails
SortYoutubeThumbnails ( thumbnails )
thumbnail := thumbnails [ 0 ]
resp , err := http . Get ( thumbnail . URL )
if err != nil {
return VideoThumbnail { } , fmt . Errorf ( "error fetching thumbnail: %v" , err )
}
defer resp . Body . Close ( )
imageData , err := io . ReadAll ( resp . Body )
if err != nil {
return VideoThumbnail { } , fmt . Errorf ( "error reading thumbnail: %v" , err )
}
// TODO: use mediaSet func to fetch s3Key
s3Key := fmt . Sprintf ( "media_sets/%s/thumbnail.jpg" , mediaSet . ID )
2021-11-22 20:35:51 +00:00
uploader := newMultipartUploader ( s . s3 , s . logger )
2021-11-21 19:43:40 +00:00
const mimeType = "application/jpeg"
2021-11-22 18:26:51 +00:00
_ , err = uploader . Upload ( ctx , bytes . NewReader ( imageData ) , s . config . S3Bucket , s3Key , mimeType )
2021-11-21 19:43:40 +00:00
if err != nil {
return VideoThumbnail { } , fmt . Errorf ( "error uploading thumbnail: %v" , err )
}
storeParams := store . SetVideoThumbnailUploadedParams {
ID : mediaSet . ID ,
VideoThumbnailMimeType : sqlString ( mimeType ) ,
2021-11-22 18:26:51 +00:00
VideoThumbnailS3Bucket : sqlString ( s . config . S3Bucket ) ,
2021-11-21 19:43:40 +00:00
VideoThumbnailS3Key : sqlString ( s3Key ) ,
VideoThumbnailWidth : sqlInt32 ( int32 ( thumbnail . Width ) ) ,
VideoThumbnailHeight : sqlInt32 ( int32 ( thumbnail . Height ) ) ,
}
if _ , err := s . store . SetVideoThumbnailUploaded ( ctx , storeParams ) ; err != nil {
return VideoThumbnail { } , fmt . Errorf ( "error updating media set: %v" , err )
}
return VideoThumbnail { Width : int ( thumbnail . Width ) , Height : int ( thumbnail . Height ) , Data : imageData } , nil
}