clipper/backend/media/service.go

743 lines
21 KiB
Go
Raw Normal View History

2021-10-22 19:30:09 +00:00
package media
2021-10-27 19:34:59 +00:00
import (
"bytes"
2021-10-27 19:34:59 +00:00
"context"
2021-11-01 05:28:40 +00:00
"database/sql"
2021-11-16 06:48:30 +00:00
"encoding/binary"
2021-10-27 19:34:59 +00:00
"errors"
"fmt"
2021-11-01 05:28:40 +00:00
"io"
2021-11-21 19:43:40 +00:00
"net/http"
2021-11-01 05:28:40 +00:00
"strconv"
"time"
2021-11-22 18:26:51 +00:00
"git.netflux.io/rob/clipper/config"
2021-11-01 05:28:40 +00:00
"git.netflux.io/rob/clipper/generated/store"
2021-11-02 18:03:26 +00:00
"github.com/aws/aws-sdk-go-v2/aws"
signerv4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
2021-11-01 05:28:40 +00:00
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/uuid"
youtubev2 "github.com/kkdai/youtube/v2"
2021-11-16 06:48:30 +00:00
"go.uber.org/zap"
2021-11-01 05:28:40 +00:00
)
const (
getVideoExpiresIn = time.Hour * 1
)
2021-11-01 05:28:40 +00:00
const (
rawAudioCodec = "pcm_s16le"
rawAudioFormat = "s16le"
rawAudioSampleRate = 48_000
2021-11-02 16:20:47 +00:00
rawAudioMimeType = "audio/raw"
2021-11-01 05:28:40 +00:00
)
const (
thumbnailWidth = 177 // 16:9
thumbnailHeight = 100 // "
2021-10-27 19:34:59 +00:00
)
2021-10-22 19:30:09 +00:00
2021-11-01 05:28:40 +00:00
// progressReader is a reader that prints progress logs as it reads.
type progressReader struct {
io.Reader
2021-11-22 20:35:51 +00:00
2021-11-01 05:28:40 +00:00
label string
total, exp int64
2021-11-22 20:35:51 +00:00
logger *zap.SugaredLogger
}
func newProgressReader(reader io.Reader, label string, exp int64, logger *zap.SugaredLogger) *progressReader {
return &progressReader{
Reader: reader,
exp: exp,
logger: logger.Named(fmt.Sprintf("ProgressReader %s", label)),
}
2021-11-01 05:28:40 +00:00
}
2021-11-22 20:35:51 +00:00
func (r *progressReader) Read(p []byte) (int, error) {
n, err := r.Reader.Read(p)
r.total += int64(n)
2021-11-01 05:28:40 +00:00
2021-11-22 20:35:51 +00:00
r.logger.Debugf("Read %d of %d (%.02f%%) bytes from the provided reader", r.total, r.exp, (float32(r.total)/float32(r.exp))*100.0)
2021-11-01 05:28:40 +00:00
return n, err
}
// Store wraps a database store.
type Store interface {
2021-11-21 19:43:40 +00:00
GetMediaSet(context.Context, uuid.UUID) (store.MediaSet, error)
GetMediaSetByYoutubeID(context.Context, string) (store.MediaSet, error)
CreateMediaSet(context.Context, store.CreateMediaSetParams) (store.MediaSet, error)
SetAudioUploaded(context.Context, store.SetAudioUploadedParams) (store.MediaSet, error)
SetVideoUploaded(context.Context, store.SetVideoUploadedParams) (store.MediaSet, error)
SetVideoThumbnailUploaded(context.Context, store.SetVideoThumbnailUploadedParams) (store.MediaSet, error)
}
// S3API provides an API to AWS S3.
type S3API struct {
S3Client
S3PresignClient
2021-11-01 05:28:40 +00:00
}
// S3Client wraps the AWS S3 service client.
type S3Client interface {
2021-11-02 18:03:26 +00:00
GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error)
2021-11-01 05:28:40 +00:00
CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error)
UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error)
AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)
2021-11-01 05:28:40 +00:00
CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error)
}
// S3PresignClient wraps the AWS S3 Presign client.
type S3PresignClient interface {
PresignGetObject(context.Context, *s3.GetObjectInput, ...func(*s3.PresignOptions)) (*signerv4.PresignedHTTPRequest, error)
}
2021-11-01 05:28:40 +00:00
// YoutubeClient wraps the youtube.Client client.
type YoutubeClient interface {
GetVideoContext(context.Context, string) (*youtubev2.Video, error)
GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error)
}
// MediaSetService exposes logical flows handling MediaSets.
type MediaSetService struct {
store Store
youtube YoutubeClient
s3 S3API
2021-11-22 18:26:51 +00:00
config config.Config
2021-11-16 06:48:30 +00:00
logger *zap.SugaredLogger
2021-11-01 05:28:40 +00:00
}
2021-11-22 18:26:51 +00:00
func NewMediaSetService(store Store, youtubeClient YoutubeClient, s3API S3API, config config.Config, logger *zap.Logger) *MediaSetService {
2021-11-01 05:28:40 +00:00
return &MediaSetService{
store: store,
youtube: youtubeClient,
s3: s3API,
2021-11-22 18:26:51 +00:00
config: config,
2021-11-16 06:48:30 +00:00
logger: logger.Sugar(),
2021-11-01 05:28:40 +00:00
}
}
// Get fetches the metadata for a given MediaSet source. If it does not exist
// in the local DB, it will attempt to create it.
2021-11-01 05:28:40 +00:00
func (s *MediaSetService) Get(ctx context.Context, youtubeID string) (*MediaSet, error) {
var (
mediaSet *MediaSet
err error
)
mediaSet, err = s.findMediaSet(ctx, youtubeID)
if err != nil {
return nil, fmt.Errorf("error getting existing media set: %v", err)
}
if mediaSet == nil {
mediaSet, err = s.createMediaSet(ctx, youtubeID)
if err != nil {
return nil, fmt.Errorf("error getting new media set: %v", err)
}
}
return mediaSet, nil
}
func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) {
video, err := s.youtube.GetVideoContext(ctx, youtubeID)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
if len(video.Formats) == 0 {
return nil, errors.New("no format available")
}
audioMetadata, err := s.fetchAudioMetadata(ctx, video)
if err != nil {
return nil, fmt.Errorf("error fetching audio metadata: %v", err)
}
videoMetadata, err := s.fetchVideoMetadata(ctx, video)
if err != nil {
return nil, fmt.Errorf("error fetching video metadata: %v", err)
}
storeParams := store.CreateMediaSetParams{
2021-11-01 05:28:40 +00:00
YoutubeID: youtubeID,
AudioYoutubeItag: int32(audioMetadata.YoutubeItag),
AudioChannels: int32(audioMetadata.Channels),
AudioFramesApprox: audioMetadata.ApproxFrames,
2021-11-02 16:20:47 +00:00
AudioSampleRate: int32(audioMetadata.SampleRate),
2021-11-01 05:28:40 +00:00
AudioMimeTypeEncoded: audioMetadata.MimeType,
VideoYoutubeItag: int32(videoMetadata.YoutubeItag),
VideoMimeType: videoMetadata.MimeType,
VideoDurationNanos: videoMetadata.Duration.Nanoseconds(),
}
mediaSet, err := s.store.CreateMediaSet(ctx, storeParams)
2021-11-01 05:28:40 +00:00
if err != nil {
return nil, fmt.Errorf("error creating media set in store: %v", err)
}
return &MediaSet{
2021-11-02 18:03:26 +00:00
ID: mediaSet.ID,
2021-11-01 05:28:40 +00:00
YoutubeID: youtubeID,
Audio: audioMetadata,
Video: videoMetadata,
}, nil
}
// findMediaSet fetches a record from the database, returning (nil, nil) if it does not exist.
func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) {
mediaSet, err := s.store.GetMediaSetByYoutubeID(ctx, youtubeID)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, fmt.Errorf("error getting existing media set: %v", err)
}
return &MediaSet{
2021-11-02 18:03:26 +00:00
ID: mediaSet.ID,
YoutubeID: mediaSet.YoutubeID,
2021-11-01 05:28:40 +00:00
Audio: Audio{
YoutubeItag: int(mediaSet.AudioYoutubeItag),
Bytes: 0, // DEPRECATED
Channels: int(mediaSet.AudioChannels),
ApproxFrames: int64(mediaSet.AudioFramesApprox),
2021-11-02 16:20:47 +00:00
Frames: mediaSet.AudioFrames.Int64,
SampleRate: int(mediaSet.AudioSampleRate),
2021-11-01 19:33:45 +00:00
MimeType: mediaSet.AudioMimeTypeEncoded,
2021-11-01 05:28:40 +00:00
},
Video: Video{
YoutubeItag: int(mediaSet.VideoYoutubeItag),
Bytes: 0, // DEPRECATED?
Duration: time.Duration(mediaSet.VideoDurationNanos),
2021-11-01 19:33:45 +00:00
MimeType: mediaSet.VideoMimeType,
2021-11-01 05:28:40 +00:00
ThumbnailWidth: 0, // ??
ThumbnailHeight: 0, // ??
},
}, nil
}
func (s *MediaSetService) fetchVideoMetadata(ctx context.Context, video *youtubev2.Video) (Video, error) {
formats := FilterYoutubeVideo(video.Formats)
if len(video.Formats) == 0 {
return Video{}, errors.New("no format available")
}
format := formats[0]
durationMsecs, err := strconv.Atoi(format.ApproxDurationMs)
if err != nil {
return Video{}, fmt.Errorf("could not parse video duration: %s", err)
}
return Video{
YoutubeItag: format.ItagNo,
MimeType: format.MimeType,
Bytes: format.ContentLength,
ThumbnailWidth: thumbnailWidth,
ThumbnailHeight: thumbnailHeight,
Duration: time.Duration(durationMsecs) * time.Millisecond,
}, nil
}
func (s *MediaSetService) fetchAudioMetadata(ctx context.Context, video *youtubev2.Video) (Audio, error) {
formats := FilterYoutubeAudio(video.Formats)
if len(video.Formats) == 0 {
return Audio{}, errors.New("no format available")
}
format := formats[0]
sampleRate, err := strconv.Atoi(format.AudioSampleRate)
if err != nil {
return Audio{}, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate)
}
approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs)
if err != nil {
return Audio{}, fmt.Errorf("could not parse audio duration: %s", err)
}
approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond
approxFrames := int64(approxDuration/time.Second) * int64(sampleRate)
return Audio{
MimeType: format.MimeType,
YoutubeItag: format.ItagNo,
ApproxFrames: approxFrames,
Channels: format.AudioChannels,
SampleRate: sampleRate,
}, nil
}
// GetVideo fetches the video part of a MediaSet.
func (s *MediaSetService) GetVideo(ctx context.Context, id uuid.UUID) (GetVideoProgressReader, error) {
mediaSet, err := s.store.GetMediaSet(ctx, id)
if err != nil {
return nil, fmt.Errorf("error getting media set: %v", err)
}
// TODO: use mediaSet func to fetch s3Key
s3Key := fmt.Sprintf("media_sets/%s/video.mp4", mediaSet.ID)
if mediaSet.VideoS3UploadedAt.Valid {
2021-11-22 18:26:51 +00:00
input := s3.GetObjectInput{Bucket: aws.String(s.config.S3Bucket), Key: aws.String(s3Key)}
request, signErr := s.s3.PresignGetObject(ctx, &input, s3.WithPresignExpires(getVideoExpiresIn))
if signErr != nil {
return nil, fmt.Errorf("error generating presigned URL: %v", signErr)
}
videoGetter := videoGetterDownloaded(request.URL)
return &videoGetter, nil
}
video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
format := video.Formats.FindByItag(int(mediaSet.VideoYoutubeItag))
if format == nil {
return nil, fmt.Errorf("error finding itag: %v", err)
}
stream, _, err := s.youtube.GetStreamContext(ctx, video, format)
if err != nil {
return nil, fmt.Errorf("error fetching stream: %v", err)
}
videoGetter := newVideoGetter(s.s3, s.store, s.logger)
return videoGetter.GetVideo(
ctx,
stream,
format.ContentLength,
mediaSet.ID,
2021-11-22 18:26:51 +00:00
s.config.S3Bucket,
s3Key,
format.MimeType,
)
}
type GetVideoProgressReader interface {
// Next returns the next video progress status. When the stream has finished,
// a valid GetVideoProgress value will be returned with io.EOF.
Next() (GetVideoProgress, error)
}
2021-11-01 05:28:40 +00:00
// GetAudio fetches the audio part of a MediaSet.
func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins int) (GetAudioProgressReader, error) {
mediaSet, err := s.store.GetMediaSet(ctx, id)
if err != nil {
return nil, fmt.Errorf("error getting media set: %v", err)
}
2021-11-02 18:03:26 +00:00
if mediaSet.AudioS3UploadedAt.Valid {
return s.getAudioFromS3(ctx, mediaSet, numBins)
}
return s.getAudioFromYoutube(ctx, mediaSet, numBins)
}
func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) {
input := s3.GetObjectInput{
Bucket: aws.String(mediaSet.AudioS3Bucket.String),
Key: aws.String(mediaSet.AudioS3Key.String),
}
output, err := s.s3.GetObject(ctx, &input)
if err != nil {
return nil, fmt.Errorf("error getting object from s3: %v", err)
}
getAudioProgressReader, err := newGetAudioProgressReader(
2021-11-02 18:03:26 +00:00
int64(mediaSet.AudioFrames.Int64),
int(mediaSet.AudioChannels),
numBins,
)
if err != nil {
return nil, fmt.Errorf("error creating audio reader: %v", err)
}
2021-11-02 18:03:26 +00:00
state := getAudioFromS3State{
2021-11-12 12:36:26 +00:00
getAudioProgressReader: getAudioProgressReader,
s3Reader: NewModuloBufReader(output.Body, int(mediaSet.AudioChannels)*SizeOfInt16),
2021-11-22 20:35:51 +00:00
logger: s.logger,
2021-11-02 18:03:26 +00:00
}
go state.run(ctx)
return &state, nil
}
type getAudioFromS3State struct {
2021-11-12 12:36:26 +00:00
*getAudioProgressReader
2021-11-02 18:03:26 +00:00
s3Reader io.ReadCloser
2021-11-22 20:35:51 +00:00
logger *zap.SugaredLogger
2021-11-02 18:03:26 +00:00
}
func (s *getAudioFromS3State) run(ctx context.Context) {
done := make(chan error)
var err error
go func() {
_, copyErr := io.Copy(s, s.s3Reader)
done <- copyErr
}()
outer:
for {
select {
case <-ctx.Done():
err = ctx.Err()
break outer
case err = <-done:
break outer
}
}
if readerErr := s.s3Reader.Close(); readerErr != nil {
if err == nil {
err = readerErr
}
}
if err != nil {
2021-11-22 20:35:51 +00:00
s.logger.Errorf("getAudioFromS3State: error closing s3 reader: %v", err)
2021-11-02 18:03:26 +00:00
s.Abort(err)
return
}
if iterErr := s.Close(); iterErr != nil {
2021-11-22 20:35:51 +00:00
s.logger.Errorf("getAudioFromS3State: error closing progress iterator: %v", iterErr)
2021-11-02 18:03:26 +00:00
}
}
func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) {
2021-11-01 05:28:40 +00:00
video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag))
if format == nil {
return nil, fmt.Errorf("error finding itag: %v", err)
}
stream, _, err := s.youtube.GetStreamContext(ctx, video, format)
if err != nil {
return nil, fmt.Errorf("error fetching stream: %v", err)
}
2021-10-22 19:30:09 +00:00
2021-11-22 20:35:51 +00:00
streamWithProgress := newProgressReader(stream, "audio", format.ContentLength, s.logger)
2021-10-27 19:34:59 +00:00
2021-11-22 20:35:51 +00:00
ffmpegReader, err := newFfmpegReader(ctx, streamWithProgress, "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-")
2021-11-01 05:28:40 +00:00
if err != nil {
return nil, fmt.Errorf("error creating ffmpegreader: %v", err)
2021-10-27 19:34:59 +00:00
}
// TODO: use mediaSet func to fetch s3Key
2021-11-06 20:52:47 +00:00
s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID)
2021-11-22 20:35:51 +00:00
uploader := newMultipartUploader(s.s3, s.logger)
2021-11-01 05:28:40 +00:00
getAudioProgressReader, err := newGetAudioProgressReader(
2021-11-01 05:28:40 +00:00
int64(mediaSet.AudioFramesApprox),
format.AudioChannels,
2021-11-02 18:03:26 +00:00
numBins,
2021-11-01 05:28:40 +00:00
)
if err != nil {
return nil, fmt.Errorf("error creating audio reader: %v", err)
}
2021-11-01 05:28:40 +00:00
2021-11-02 18:03:26 +00:00
state := getAudioFromYoutubeState{
2021-11-12 12:36:26 +00:00
getAudioProgressReader: getAudioProgressReader,
ffmpegReader: ffmpegReader,
uploader: uploader,
2021-11-22 18:26:51 +00:00
s3Bucket: s.config.S3Bucket,
2021-11-12 12:36:26 +00:00
s3Key: s3Key,
store: s.store,
channels: format.AudioChannels,
2021-11-22 20:35:51 +00:00
logger: s.logger,
2021-11-01 05:28:40 +00:00
}
go state.run(ctx, mediaSet.ID)
2021-11-01 05:28:40 +00:00
return &state, nil
}
2021-11-02 18:03:26 +00:00
type getAudioFromYoutubeState struct {
2021-11-12 12:36:26 +00:00
*getAudioProgressReader
2021-11-01 05:28:40 +00:00
ffmpegReader *ffmpegReader
uploader *multipartUploader
2021-11-02 16:20:47 +00:00
s3Bucket, s3Key string
store Store
2021-11-12 12:36:26 +00:00
channels int
2021-11-22 20:35:51 +00:00
logger *zap.SugaredLogger
2021-11-01 05:28:40 +00:00
}
func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID) {
teeReader := io.TeeReader(s.ffmpegReader, s)
bytesUploaded, err := s.uploader.Upload(ctx, teeReader, s.s3Bucket, s.s3Key, rawAudioMimeType)
// If there was an error returned, the underlying ffmpegReader process may
// still be active. Kill it.
if err != nil {
if cancelErr := s.ffmpegReader.Cancel(); cancelErr != nil {
2021-11-22 20:35:51 +00:00
s.logger.Errorf("getAudioFromYoutubeState: error cancelling ffmpegreader: %v", cancelErr)
}
}
// Either way, we need to wait for the ffmpegReader process to exit,
// and ensure there is no error.
2021-11-01 05:28:40 +00:00
if readerErr := s.ffmpegReader.Close(); readerErr != nil {
if err == nil {
err = readerErr
}
}
2021-11-02 16:20:47 +00:00
if err == nil {
_, updateErr := s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{
ID: mediaSetID,
2021-11-02 16:20:47 +00:00
AudioS3Bucket: sqlString(s.s3Bucket),
AudioS3Key: sqlString(s.s3Key),
2021-11-12 12:36:26 +00:00
AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(s.channels)),
2021-11-02 16:20:47 +00:00
})
if updateErr != nil {
err = updateErr
}
}
2021-11-01 05:28:40 +00:00
if err != nil {
2021-11-22 20:35:51 +00:00
s.logger.Errorf("getAudioFromYoutubeState: error uploading asynchronously: %v", err)
2021-11-08 01:54:43 +00:00
2021-11-01 05:28:40 +00:00
s.Abort(err)
return
}
if iterErr := s.Close(); iterErr != nil {
2021-11-22 20:35:51 +00:00
s.logger.Errorf("getAudioFromYoutubeState: error closing progress iterator: %v", iterErr)
2021-11-01 05:28:40 +00:00
}
2021-10-22 19:30:09 +00:00
}
2021-11-02 16:20:47 +00:00
func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, numBins int) ([]int16, error) {
2021-11-16 06:48:30 +00:00
mediaSet, err := s.store.GetMediaSet(ctx, id)
if err != nil {
return nil, fmt.Errorf("error getting media set: %v", err)
}
byteRange := fmt.Sprintf(
"bytes=%d-%d",
startFrame*int64(mediaSet.AudioChannels)*SizeOfInt16,
endFrame*int64(mediaSet.AudioChannels)*SizeOfInt16,
)
input := s3.GetObjectInput{
Bucket: aws.String(mediaSet.AudioS3Bucket.String),
Key: aws.String(mediaSet.AudioS3Key.String),
Range: aws.String(byteRange),
}
output, err := s.s3.GetObject(ctx, &input)
if err != nil {
return nil, fmt.Errorf("error getting object from s3: %v", err)
}
defer output.Body.Close()
const readBufSizeBytes = 8_192
2021-11-16 06:48:30 +00:00
channels := int(mediaSet.AudioChannels)
modReader := NewModuloBufReader(output.Body, channels*SizeOfInt16)
readBuf := make([]byte, readBufSizeBytes)
peaks := make([]int16, channels*numBins)
2021-11-16 06:48:30 +00:00
totalFrames := endFrame - startFrame
framesPerBin := totalFrames / int64(numBins)
sampleBuf := make([]int16, readBufSizeBytes/SizeOfInt16)
bytesExpected := (endFrame - startFrame) * int64(channels) * SizeOfInt16
2021-11-16 06:48:30 +00:00
var (
bytesRead int64
closing bool
currPeakIndex int
currFrame int64
)
2021-11-16 06:48:30 +00:00
for {
n, err := modReader.Read(readBuf)
if err == io.EOF {
closing = true
} else if err != nil {
2021-11-16 06:48:30 +00:00
return nil, fmt.Errorf("read error: %v", err)
}
bytesRead += int64(n)
samples := sampleBuf[:n/SizeOfInt16]
if err := binary.Read(bytes.NewReader(readBuf[:n]), binary.LittleEndian, samples); err != nil {
2021-11-16 06:48:30 +00:00
return nil, fmt.Errorf("error interpreting samples: %v", err)
}
for i := 0; i < len(samples); i += channels {
for j := 0; j < channels; j++ {
samp := sampleBuf[i+j]
2021-11-16 06:48:30 +00:00
if samp < 0 {
samp = -samp
}
if samp > peaks[currPeakIndex+j] {
peaks[currPeakIndex+j] = samp
2021-11-16 06:48:30 +00:00
}
}
2021-11-16 06:48:30 +00:00
if currFrame == framesPerBin {
currFrame = 0
currPeakIndex += channels
} else {
currFrame++
2021-11-16 06:48:30 +00:00
}
}
if closing {
break
}
}
if bytesRead < bytesExpected {
2021-11-22 20:35:51 +00:00
s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioS3Key.String).Warn("short read from S3")
2021-11-16 06:48:30 +00:00
}
return peaks, nil
}
2021-11-02 16:20:47 +00:00
func sqlString(s string) sql.NullString {
return sql.NullString{String: s, Valid: true}
}
2021-11-02 18:03:26 +00:00
func sqlInt64(i int64) sql.NullInt64 {
return sql.NullInt64{Int64: i, Valid: true}
}
2021-11-21 19:43:40 +00:00
func sqlInt32(i int32) sql.NullInt32 {
return sql.NullInt32{Int32: i, Valid: true}
}
2021-11-16 06:48:30 +00:00
// ModuloBufReader reads from a reader in block sizes that are exactly modulo
// modSize, with any remainder buffered until the next read.
type ModuloBufReader struct {
io.ReadCloser
buf bytes.Buffer
modSize int
}
func NewModuloBufReader(r io.ReadCloser, modSize int) *ModuloBufReader {
return &ModuloBufReader{ReadCloser: r, modSize: modSize}
}
func (r *ModuloBufReader) Read(p []byte) (int, error) {
// err is always io.EOF or nil
nr1, _ := r.buf.Read(p)
nr2, err := r.ReadCloser.Read(p[nr1:])
nr := nr1 + nr2
rem := nr % r.modSize
// if there was an error, return immediately.
if err == io.EOF {
return nr, err
} else if err != nil {
return nr - rem, err
}
// write any remainder to the buffer
if rem != 0 {
// err is always nil
2021-11-16 06:49:44 +00:00
_, _ = r.buf.Write(p[nr-rem : nr])
}
return nr - rem, err
}
2021-11-21 19:43:40 +00:00
type VideoThumbnail struct {
Data []byte
Width, Height int
}
func (s *MediaSetService) GetVideoThumbnail(ctx context.Context, id uuid.UUID) (VideoThumbnail, error) {
mediaSet, err := s.store.GetMediaSet(ctx, id)
if err != nil {
return VideoThumbnail{}, fmt.Errorf("error getting media set: %v", err)
}
if mediaSet.VideoThumbnailS3UploadedAt.Valid {
return s.getThumbnailFromS3(ctx, mediaSet)
}
return s.getThumbnailFromYoutube(ctx, mediaSet)
}
func (s *MediaSetService) getThumbnailFromS3(ctx context.Context, mediaSet store.MediaSet) (VideoThumbnail, error) {
input := s3.GetObjectInput{
Bucket: aws.String(mediaSet.VideoThumbnailS3Bucket.String),
Key: aws.String(mediaSet.VideoThumbnailS3Key.String),
}
output, err := s.s3.GetObject(ctx, &input)
if err != nil {
return VideoThumbnail{}, fmt.Errorf("error fetching thumbnail from s3: %v", err)
}
defer output.Body.Close()
imageData, err := io.ReadAll(output.Body)
if err != nil {
return VideoThumbnail{}, fmt.Errorf("error reading thumbnail from s3: %v", err)
}
return VideoThumbnail{
Width: int(mediaSet.VideoThumbnailWidth.Int32),
Height: int(mediaSet.VideoThumbnailHeight.Int32),
Data: imageData,
}, nil
}
func (s *MediaSetService) getThumbnailFromYoutube(ctx context.Context, mediaSet store.MediaSet) (VideoThumbnail, error) {
video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID)
if err != nil {
return VideoThumbnail{}, fmt.Errorf("error fetching video: %v", err)
}
if len(video.Formats) == 0 {
return VideoThumbnail{}, errors.New("no format available")
}
thumbnails := video.Thumbnails
SortYoutubeThumbnails(thumbnails)
thumbnail := thumbnails[0]
resp, err := http.Get(thumbnail.URL)
if err != nil {
return VideoThumbnail{}, fmt.Errorf("error fetching thumbnail: %v", err)
}
defer resp.Body.Close()
imageData, err := io.ReadAll(resp.Body)
if err != nil {
return VideoThumbnail{}, fmt.Errorf("error reading thumbnail: %v", err)
}
// TODO: use mediaSet func to fetch s3Key
s3Key := fmt.Sprintf("media_sets/%s/thumbnail.jpg", mediaSet.ID)
2021-11-22 20:35:51 +00:00
uploader := newMultipartUploader(s.s3, s.logger)
2021-11-21 19:43:40 +00:00
const mimeType = "application/jpeg"
2021-11-22 18:26:51 +00:00
_, err = uploader.Upload(ctx, bytes.NewReader(imageData), s.config.S3Bucket, s3Key, mimeType)
2021-11-21 19:43:40 +00:00
if err != nil {
return VideoThumbnail{}, fmt.Errorf("error uploading thumbnail: %v", err)
}
storeParams := store.SetVideoThumbnailUploadedParams{
ID: mediaSet.ID,
VideoThumbnailMimeType: sqlString(mimeType),
2021-11-22 18:26:51 +00:00
VideoThumbnailS3Bucket: sqlString(s.config.S3Bucket),
2021-11-21 19:43:40 +00:00
VideoThumbnailS3Key: sqlString(s3Key),
VideoThumbnailWidth: sqlInt32(int32(thumbnail.Width)),
VideoThumbnailHeight: sqlInt32(int32(thumbnail.Height)),
}
if _, err := s.store.SetVideoThumbnailUploaded(ctx, storeParams); err != nil {
return VideoThumbnail{}, fmt.Errorf("error updating media set: %v", err)
}
return VideoThumbnail{Width: int(thumbnail.Width), Height: int(thumbnail.Height), Data: imageData}, nil
}