clipper/backend/media/service.go

526 lines
15 KiB
Go
Raw Normal View History

2021-10-22 19:30:09 +00:00
package media
2021-10-27 19:34:59 +00:00
import (
"bytes"
2021-10-27 19:34:59 +00:00
"context"
2021-11-01 05:28:40 +00:00
"database/sql"
2021-11-16 06:48:30 +00:00
"encoding/binary"
2021-10-27 19:34:59 +00:00
"errors"
"fmt"
2021-11-01 05:28:40 +00:00
"io"
"strconv"
"strings"
2021-11-01 05:28:40 +00:00
"time"
2021-11-22 18:26:51 +00:00
"git.netflux.io/rob/clipper/config"
2021-11-01 05:28:40 +00:00
"git.netflux.io/rob/clipper/generated/store"
"github.com/google/uuid"
2021-11-25 19:35:51 +00:00
"github.com/jackc/pgx/v4"
2021-11-01 05:28:40 +00:00
youtubev2 "github.com/kkdai/youtube/v2"
2021-11-16 06:48:30 +00:00
"go.uber.org/zap"
2021-11-01 05:28:40 +00:00
)
const (
rawAudioCodec = "pcm_s16le"
rawAudioFormat = "s16le"
rawAudioSampleRate = 48_000
2021-11-02 16:20:47 +00:00
rawAudioMimeType = "audio/raw"
2021-11-01 05:28:40 +00:00
)
const (
thumbnailWidth = 177 // 16:9
thumbnailHeight = 100 // "
2021-10-27 19:34:59 +00:00
)
2021-10-22 19:30:09 +00:00
2021-11-01 05:28:40 +00:00
// MediaSetService exposes logical flows handling MediaSets.
type MediaSetService struct {
2021-12-29 15:38:25 +00:00
store Store
youtube YoutubeClient
fileStore FileStore
commandFunc CommandFunc
2022-01-05 18:49:21 +00:00
workerPool *WorkerPool
2021-12-29 15:38:25 +00:00
config config.Config
logger *zap.SugaredLogger
2021-11-01 05:28:40 +00:00
}
2022-01-05 18:49:21 +00:00
func NewMediaSetService(store Store, youtubeClient YoutubeClient, fileStore FileStore, commandFunc CommandFunc, workerPool *WorkerPool, config config.Config, logger *zap.SugaredLogger) *MediaSetService {
2021-11-01 05:28:40 +00:00
return &MediaSetService{
2021-12-29 15:38:25 +00:00
store: store,
youtube: youtubeClient,
fileStore: fileStore,
commandFunc: commandFunc,
2022-01-05 18:49:21 +00:00
workerPool: workerPool,
2021-12-29 15:38:25 +00:00
config: config,
logger: logger,
2021-11-01 05:28:40 +00:00
}
}
// Get fetches the metadata for a given MediaSet source. If it does not exist
2021-11-29 15:06:43 +00:00
// in the local DB, it will attempt to create it. After the resource has been
// created, other endpoints (e.g. GetPeaks) can be called to fetch media from
2021-12-07 19:58:11 +00:00
// Youtube and store it in a file store.
2021-11-01 05:28:40 +00:00
func (s *MediaSetService) Get(ctx context.Context, youtubeID string) (*MediaSet, error) {
var (
mediaSet *MediaSet
err error
)
mediaSet, err = s.findMediaSet(ctx, youtubeID)
if err != nil {
2021-11-29 15:06:43 +00:00
return nil, fmt.Errorf("error finding existing media set: %v", err)
2021-11-01 05:28:40 +00:00
}
if mediaSet == nil {
mediaSet, err = s.createMediaSet(ctx, youtubeID)
if err != nil {
2021-11-29 15:06:43 +00:00
return nil, fmt.Errorf("error creating new media set: %v", err)
2021-11-01 05:28:40 +00:00
}
}
return mediaSet, nil
}
func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) {
video, err := s.youtube.GetVideoContext(ctx, youtubeID)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
if len(video.Formats) == 0 {
return nil, errors.New("no format available")
}
audioMetadata, err := s.fetchAudioMetadata(ctx, video)
if err != nil {
return nil, fmt.Errorf("error fetching audio metadata: %v", err)
}
videoMetadata, err := s.fetchVideoMetadata(ctx, video)
if err != nil {
return nil, fmt.Errorf("error fetching video metadata: %v", err)
}
storeParams := store.CreateMediaSetParams{
2021-11-01 05:28:40 +00:00
YoutubeID: youtubeID,
Title: strings.TrimSpace(video.Title),
Description: strings.TrimSpace(video.Description),
Author: strings.TrimSpace(video.Author),
2021-11-01 05:28:40 +00:00
AudioYoutubeItag: int32(audioMetadata.YoutubeItag),
AudioChannels: int32(audioMetadata.Channels),
AudioFramesApprox: audioMetadata.ApproxFrames,
2021-11-02 16:20:47 +00:00
AudioSampleRate: int32(audioMetadata.SampleRate),
AudioEncodedMimeType: audioMetadata.MimeType,
2021-11-29 11:46:33 +00:00
AudioContentLength: audioMetadata.ContentLength,
2021-11-01 05:28:40 +00:00
VideoYoutubeItag: int32(videoMetadata.YoutubeItag),
2021-11-29 11:46:33 +00:00
VideoContentLength: videoMetadata.ContentLength,
2021-11-01 05:28:40 +00:00
VideoMimeType: videoMetadata.MimeType,
VideoDurationNanos: videoMetadata.Duration.Nanoseconds(),
}
mediaSet, err := s.store.CreateMediaSet(ctx, storeParams)
2021-11-01 05:28:40 +00:00
if err != nil {
return nil, fmt.Errorf("error creating media set in store: %v", err)
}
return &MediaSet{
ID: mediaSet.ID,
YoutubeID: youtubeID,
Title: mediaSet.Title,
Description: mediaSet.Description,
Author: mediaSet.Author,
Audio: audioMetadata,
Video: videoMetadata,
2021-11-01 05:28:40 +00:00
}, nil
}
// findMediaSet fetches a record from the database, returning (nil, nil) if it does not exist.
func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) {
mediaSet, err := s.store.GetMediaSetByYoutubeID(ctx, youtubeID)
if err != nil {
2021-11-25 19:35:51 +00:00
if err == pgx.ErrNoRows {
2021-11-01 05:28:40 +00:00
return nil, nil
}
2021-11-25 19:35:51 +00:00
return nil, fmt.Errorf("error getting media set: %v", err)
2021-11-01 05:28:40 +00:00
}
return &MediaSet{
ID: mediaSet.ID,
YoutubeID: mediaSet.YoutubeID,
Title: mediaSet.Title,
Description: mediaSet.Description,
Author: mediaSet.Author,
2021-11-01 05:28:40 +00:00
Audio: Audio{
2021-11-29 11:46:33 +00:00
YoutubeItag: int(mediaSet.AudioYoutubeItag),
ContentLength: mediaSet.AudioContentLength,
Channels: int(mediaSet.AudioChannels),
ApproxFrames: int64(mediaSet.AudioFramesApprox),
Frames: mediaSet.AudioFrames.Int64,
SampleRate: int(mediaSet.AudioSampleRate),
MimeType: mediaSet.AudioEncodedMimeType,
2021-11-01 05:28:40 +00:00
},
Video: Video{
YoutubeItag: int(mediaSet.VideoYoutubeItag),
2021-11-29 11:46:33 +00:00
ContentLength: mediaSet.VideoContentLength,
2021-11-01 05:28:40 +00:00
Duration: time.Duration(mediaSet.VideoDurationNanos),
2021-11-01 19:33:45 +00:00
MimeType: mediaSet.VideoMimeType,
2021-11-01 05:28:40 +00:00
ThumbnailWidth: 0, // ??
ThumbnailHeight: 0, // ??
},
}, nil
}
func (s *MediaSetService) fetchVideoMetadata(ctx context.Context, video *youtubev2.Video) (Video, error) {
formats := FilterYoutubeVideo(video.Formats)
if len(video.Formats) == 0 {
return Video{}, errors.New("no format available")
}
format := formats[0]
durationMsecs, err := strconv.Atoi(format.ApproxDurationMs)
if err != nil {
return Video{}, fmt.Errorf("could not parse video duration: %s", err)
}
return Video{
YoutubeItag: format.ItagNo,
MimeType: format.MimeType,
2021-11-29 11:46:33 +00:00
ContentLength: format.ContentLength,
2021-11-01 05:28:40 +00:00
ThumbnailWidth: thumbnailWidth,
ThumbnailHeight: thumbnailHeight,
Duration: time.Duration(durationMsecs) * time.Millisecond,
}, nil
}
func (s *MediaSetService) fetchAudioMetadata(ctx context.Context, video *youtubev2.Video) (Audio, error) {
formats := FilterYoutubeAudio(video.Formats)
if len(video.Formats) == 0 {
return Audio{}, errors.New("no format available")
}
format := formats[0]
sampleRate, err := strconv.Atoi(format.AudioSampleRate)
if err != nil {
return Audio{}, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate)
}
approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs)
if err != nil {
return Audio{}, fmt.Errorf("could not parse audio duration: %s", err)
}
approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond
approxFrames := int64(approxDuration/time.Second) * int64(sampleRate)
return Audio{
2021-11-29 11:46:33 +00:00
ContentLength: format.ContentLength,
MimeType: format.MimeType,
YoutubeItag: format.ItagNo,
ApproxFrames: approxFrames,
Channels: format.AudioChannels,
SampleRate: sampleRate,
2021-11-01 05:28:40 +00:00
}, nil
}
// GetVideo fetches the video part of a MediaSet.
func (s *MediaSetService) GetVideo(ctx context.Context, id uuid.UUID) (GetVideoProgressReader, error) {
mediaSet, err := s.store.GetMediaSet(ctx, id)
if err != nil {
return nil, fmt.Errorf("error getting media set: %v", err)
}
if mediaSet.VideoS3UploadedAt.Valid {
var url string
url, err = s.fileStore.GetURL(ctx, mediaSet.VideoS3Key.String)
2021-12-07 19:58:11 +00:00
if err != nil {
return nil, fmt.Errorf("error generating presigned URL: %v", err)
}
videoGetter := videoGetterFromFileStore(url)
return &videoGetter, nil
}
video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
format := video.Formats.FindByItag(int(mediaSet.VideoYoutubeItag))
if format == nil {
return nil, fmt.Errorf("error finding itag: %v", err)
}
stream, _, err := s.youtube.GetStreamContext(ctx, video, format)
if err != nil {
return nil, fmt.Errorf("error fetching stream: %v", err)
}
2021-12-07 19:58:11 +00:00
// TODO: use mediaSet func to fetch videoKey
videoKey := fmt.Sprintf("media_sets/%s/video.mp4", mediaSet.ID)
2021-11-29 15:06:43 +00:00
2021-12-07 19:58:11 +00:00
videoGetter := newVideoGetter(s.store, s.fileStore, s.logger)
return videoGetter.GetVideo(
ctx,
stream,
format.ContentLength,
mediaSet.ID,
2021-12-07 19:58:11 +00:00
videoKey,
format.MimeType,
)
}
2021-12-29 15:38:25 +00:00
// GetPeaks fetches the audio part of a MediaSet.
func (s *MediaSetService) GetPeaks(ctx context.Context, id uuid.UUID, numBins int) (GetPeaksProgressReader, error) {
2021-11-01 05:28:40 +00:00
mediaSet, err := s.store.GetMediaSet(ctx, id)
if err != nil {
return nil, fmt.Errorf("error getting media set: %v", err)
}
// We need both raw and encoded audio to have been uploaded successfully.
// Otherwise, we cannot return both peaks and a presigned URL for use by the
// player.
if mediaSet.AudioRawS3UploadedAt.Valid && mediaSet.AudioEncodedS3UploadedAt.Valid {
return s.getPeaksFromFileStore(ctx, mediaSet, numBins)
2021-11-02 18:03:26 +00:00
}
// Fetch the audio from Youtube, calculate and store the peaks and return
// them.
2021-11-02 18:03:26 +00:00
return s.getAudioFromYoutube(ctx, mediaSet, numBins)
}
func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetPeaksProgressReader, error) {
2022-01-05 18:49:21 +00:00
audioGetter := newAudioGetter(s.store, s.youtube, s.fileStore, s.commandFunc, s.workerPool, s.config, s.logger)
2021-11-29 11:46:33 +00:00
return audioGetter.GetAudio(ctx, mediaSet, numBins)
}
func (s *MediaSetService) getPeaksFromFileStore(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetPeaksProgressReader, error) {
2021-12-07 19:58:11 +00:00
object, err := s.fileStore.GetObject(ctx, mediaSet.AudioRawS3Key.String)
2021-11-02 18:03:26 +00:00
if err != nil {
2021-12-07 19:58:11 +00:00
return nil, fmt.Errorf("error getting object from file store: %v", err)
2021-11-02 18:03:26 +00:00
}
getPeaksProgressReader, err := newGetPeaksProgressReader(
2021-11-02 18:03:26 +00:00
int64(mediaSet.AudioFrames.Int64),
int(mediaSet.AudioChannels),
numBins,
)
if err != nil {
return nil, fmt.Errorf("error creating audio reader: %v", err)
}
2021-11-02 18:03:26 +00:00
state := getPeaksFromFileStoreState{
getPeaksProgressReader: getPeaksProgressReader,
2021-12-17 16:52:59 +00:00
reader: NewModuloReader(object, int(mediaSet.AudioChannels)*SizeOfInt16),
2021-12-07 19:58:11 +00:00
fileStore: s.fileStore,
config: s.config,
2021-11-22 20:35:51 +00:00
logger: s.logger,
2021-11-02 18:03:26 +00:00
}
go state.run(ctx, mediaSet)
2021-11-02 18:03:26 +00:00
return &state, nil
}
type getPeaksFromFileStoreState struct {
*getPeaksProgressReader
2021-11-02 18:03:26 +00:00
2021-12-07 19:58:11 +00:00
reader io.ReadCloser
fileStore FileStore
config config.Config
logger *zap.SugaredLogger
2021-11-02 18:03:26 +00:00
}
func (s *getPeaksFromFileStoreState) run(ctx context.Context, mediaSet store.MediaSet) {
2021-11-02 18:03:26 +00:00
done := make(chan error)
var err error
go func() {
2021-12-07 19:58:11 +00:00
_, copyErr := io.Copy(s, s.reader)
2021-11-02 18:03:26 +00:00
done <- copyErr
}()
outer:
for {
select {
case <-ctx.Done():
err = ctx.Err()
break outer
case err = <-done:
break outer
}
}
2021-12-07 19:58:11 +00:00
if readerErr := s.reader.Close(); readerErr != nil {
2021-11-02 18:03:26 +00:00
if err == nil {
err = readerErr
}
}
if err != nil {
2021-12-07 19:58:11 +00:00
s.logger.Errorf("getAudioFromFileStoreState: error closing reader: %v", err)
2021-11-29 11:46:33 +00:00
s.CloseWithError(err)
2021-11-02 18:03:26 +00:00
return
}
2021-12-07 19:58:11 +00:00
url, err := s.fileStore.GetURL(ctx, mediaSet.AudioEncodedS3Key.String)
if err != nil {
2021-12-07 19:58:11 +00:00
s.CloseWithError(fmt.Errorf("error generating object URL: %v", err))
}
2021-12-07 19:58:11 +00:00
if iterErr := s.Close(url); iterErr != nil {
s.logger.Errorf("getAudioFromFileStoreState: error closing progress iterator: %v", iterErr)
2021-11-02 18:03:26 +00:00
}
}
func (s *MediaSetService) GetPeaksForSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, numBins int) ([]int16, error) {
2021-11-30 19:41:34 +00:00
if startFrame < 0 || endFrame < 0 || numBins <= 0 {
s.logger.With("startFrame", startFrame, "endFrame", endFrame, "numBins", numBins).Error("invalid arguments")
return nil, errors.New("invalid arguments")
}
2021-11-16 06:48:30 +00:00
mediaSet, err := s.store.GetMediaSet(ctx, id)
if err != nil {
return nil, fmt.Errorf("error getting media set: %v", err)
}
2021-12-07 19:58:11 +00:00
object, err := s.fileStore.GetObjectWithRange(
ctx,
mediaSet.AudioRawS3Key.String,
2021-11-16 06:48:30 +00:00
startFrame*int64(mediaSet.AudioChannels)*SizeOfInt16,
endFrame*int64(mediaSet.AudioChannels)*SizeOfInt16,
)
if err != nil {
2021-12-07 19:58:11 +00:00
return nil, fmt.Errorf("error getting object from file store: %v", err)
2021-11-16 06:48:30 +00:00
}
2021-12-07 19:58:11 +00:00
defer object.Close()
2021-11-16 06:48:30 +00:00
const readBufSizeBytes = 8_192
2021-11-16 06:48:30 +00:00
channels := int(mediaSet.AudioChannels)
2021-12-17 16:52:59 +00:00
modReader := NewModuloReader(object, channels*SizeOfInt16)
readBuf := make([]byte, readBufSizeBytes)
peaks := make([]int16, channels*numBins)
2021-11-16 06:48:30 +00:00
totalFrames := endFrame - startFrame
framesPerBin := totalFrames / int64(numBins)
sampleBuf := make([]int16, readBufSizeBytes/SizeOfInt16)
bytesExpected := (endFrame - startFrame) * int64(channels) * SizeOfInt16
2021-11-16 06:48:30 +00:00
var bytesRead int64
var closing bool
2021-11-16 06:48:30 +00:00
for bin := 0; bin < numBins; bin++ {
framesRemaining := framesPerBin
if bin == numBins-1 {
framesRemaining += totalFrames % int64(numBins)
2021-11-16 06:48:30 +00:00
}
for {
// Read as many bytes as possible, but not exceeding the available buffer
// size nor framesRemaining:
bytesToRead := framesRemaining * int64(channels) * SizeOfInt16
max := int64(len(readBuf))
if bytesToRead > max {
bytesToRead = max
}
n, err := modReader.Read(readBuf[:bytesToRead])
if err == io.EOF {
closing = true
} else if err != nil {
return nil, fmt.Errorf("read error: %v", err)
}
2021-11-16 06:48:30 +00:00
ss := sampleBuf[:n/SizeOfInt16]
if err := binary.Read(bytes.NewReader(readBuf[:n]), binary.LittleEndian, ss); err != nil {
return nil, fmt.Errorf("error interpreting samples: %v", err)
}
pi := bin * channels
for i := 0; i < len(ss); i += channels {
for j := 0; j < channels; j++ {
s := ss[i+j]
if s < 0 {
s = -s
}
if s > peaks[pi+j] {
peaks[pi+j] = s
}
2021-11-16 06:48:30 +00:00
}
}
framesRemaining -= int64(n) / int64(channels) / SizeOfInt16
bytesRead += int64(n)
if closing || framesRemaining == 0 {
break
2021-11-16 06:48:30 +00:00
}
}
if closing {
break
}
}
if bytesRead < bytesExpected {
2021-12-07 19:58:11 +00:00
s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioRawS3Key.String).Warn("short read from file store")
2021-11-16 06:48:30 +00:00
}
return peaks, nil
}
func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, outFormat AudioFormat) (AudioSegmentStream, error) {
2021-12-29 15:38:25 +00:00
if startFrame > endFrame {
return nil, errors.New("invalid range")
2021-11-21 19:43:40 +00:00
}
2021-12-29 15:38:25 +00:00
mediaSet, err := s.store.GetMediaSet(ctx, id)
2021-11-21 19:43:40 +00:00
if err != nil {
2021-12-29 15:38:25 +00:00
return nil, fmt.Errorf("error getting media set: %v", err)
2021-11-21 19:43:40 +00:00
}
2021-12-07 19:58:11 +00:00
// TODO: use mediaSet func to fetch key
2021-12-29 15:38:25 +00:00
key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID)
startByte := startFrame * int64(mediaSet.AudioChannels) * SizeOfInt16
endByte := endFrame * int64(mediaSet.AudioChannels) * SizeOfInt16
2021-11-21 19:43:40 +00:00
2021-12-29 15:38:25 +00:00
rawAudio, err := s.fileStore.GetObjectWithRange(ctx, key, startByte, endByte)
2021-11-21 19:43:40 +00:00
if err != nil {
2021-12-29 15:38:25 +00:00
return nil, fmt.Errorf("error getting object from store: %v", err)
2021-11-21 19:43:40 +00:00
}
2022-01-05 18:49:21 +00:00
g := newAudioSegmentGetter(s.commandFunc, s.workerPool, rawAudio, mediaSet.AudioChannels, endByte-startByte, outFormat)
2021-12-29 15:38:25 +00:00
go g.getAudioSegment(ctx)
2021-11-21 19:43:40 +00:00
2021-12-29 15:38:25 +00:00
return g.stream, nil
2021-11-21 19:43:40 +00:00
}
2021-11-29 15:06:43 +00:00
// logProgressReader is a reader that prints progress logs as it reads.
type logProgressReader struct {
2021-11-29 15:06:43 +00:00
io.Reader
total, exp int64
logger *zap.SugaredLogger
}
func newLogProgressReader(reader io.Reader, label string, exp int64, logger *zap.SugaredLogger) *logProgressReader {
return &logProgressReader{
2021-11-29 15:06:43 +00:00
Reader: reader,
exp: exp,
2021-12-07 19:58:11 +00:00
logger: logger.Named(label),
2021-11-29 15:06:43 +00:00
}
}
func (r *logProgressReader) Read(p []byte) (int, error) {
2021-11-29 15:06:43 +00:00
n, err := r.Reader.Read(p)
r.total += int64(n)
r.logger.Debugf("Read %d of %d (%.02f%%) bytes from the provided reader", r.total, r.exp, (float32(r.total)/float32(r.exp))*100.0)
return n, err
}
2021-12-29 15:38:25 +00:00
func sqlString(s string) sql.NullString {
return sql.NullString{String: s, Valid: true}
}
func sqlInt64(i int64) sql.NullInt64 {
return sql.NullInt64{Int64: i, Valid: true}
}
func sqlInt32(i int32) sql.NullInt32 {
return sql.NullInt32{Int32: i, Valid: true}
}