2021-10-22 19:30:09 +00:00
package media
2021-10-27 19:34:59 +00:00
import (
2021-11-04 06:13:00 +00:00
"bytes"
2021-10-27 19:34:59 +00:00
"context"
2021-11-01 05:28:40 +00:00
"database/sql"
2021-11-16 06:48:30 +00:00
"encoding/binary"
2021-10-27 19:34:59 +00:00
"errors"
"fmt"
2021-11-01 05:28:40 +00:00
"io"
"strconv"
2022-01-13 19:05:09 +00:00
"strings"
2021-11-01 05:28:40 +00:00
"time"
2021-11-22 18:26:51 +00:00
"git.netflux.io/rob/clipper/config"
2021-11-01 05:28:40 +00:00
"git.netflux.io/rob/clipper/generated/store"
"github.com/google/uuid"
2021-11-25 19:35:51 +00:00
"github.com/jackc/pgx/v4"
2021-11-01 05:28:40 +00:00
youtubev2 "github.com/kkdai/youtube/v2"
2021-11-16 06:48:30 +00:00
"go.uber.org/zap"
2021-11-01 05:28:40 +00:00
)
const (
rawAudioCodec = "pcm_s16le"
rawAudioFormat = "s16le"
rawAudioSampleRate = 48_000
2021-11-02 16:20:47 +00:00
rawAudioMimeType = "audio/raw"
2021-11-01 05:28:40 +00:00
)
const (
thumbnailWidth = 177 // 16:9
thumbnailHeight = 100 // "
2021-10-27 19:34:59 +00:00
)
2021-10-22 19:30:09 +00:00
2021-11-01 05:28:40 +00:00
// MediaSetService exposes logical flows handling MediaSets.
type MediaSetService struct {
2021-12-29 15:38:25 +00:00
store Store
youtube YoutubeClient
fileStore FileStore
commandFunc CommandFunc
2022-01-05 18:49:21 +00:00
workerPool * WorkerPool
2021-12-29 15:38:25 +00:00
config config . Config
logger * zap . SugaredLogger
2021-11-01 05:28:40 +00:00
}
2022-01-05 18:49:21 +00:00
func NewMediaSetService ( store Store , youtubeClient YoutubeClient , fileStore FileStore , commandFunc CommandFunc , workerPool * WorkerPool , config config . Config , logger * zap . SugaredLogger ) * MediaSetService {
2021-11-01 05:28:40 +00:00
return & MediaSetService {
2021-12-29 15:38:25 +00:00
store : store ,
youtube : youtubeClient ,
fileStore : fileStore ,
commandFunc : commandFunc ,
2022-01-05 18:49:21 +00:00
workerPool : workerPool ,
2021-12-29 15:38:25 +00:00
config : config ,
logger : logger ,
2021-11-01 05:28:40 +00:00
}
}
2021-11-20 18:29:34 +00:00
// Get fetches the metadata for a given MediaSet source. If it does not exist
2021-11-29 15:06:43 +00:00
// in the local DB, it will attempt to create it. After the resource has been
2021-12-17 16:30:53 +00:00
// created, other endpoints (e.g. GetPeaks) can be called to fetch media from
2021-12-07 19:58:11 +00:00
// Youtube and store it in a file store.
2021-11-01 05:28:40 +00:00
func ( s * MediaSetService ) Get ( ctx context . Context , youtubeID string ) ( * MediaSet , error ) {
var (
mediaSet * MediaSet
err error
)
mediaSet , err = s . findMediaSet ( ctx , youtubeID )
if err != nil {
2021-11-29 15:06:43 +00:00
return nil , fmt . Errorf ( "error finding existing media set: %v" , err )
2021-11-01 05:28:40 +00:00
}
if mediaSet == nil {
mediaSet , err = s . createMediaSet ( ctx , youtubeID )
if err != nil {
2021-11-29 15:06:43 +00:00
return nil , fmt . Errorf ( "error creating new media set: %v" , err )
2021-11-01 05:28:40 +00:00
}
}
return mediaSet , nil
}
func ( s * MediaSetService ) createMediaSet ( ctx context . Context , youtubeID string ) ( * MediaSet , error ) {
video , err := s . youtube . GetVideoContext ( ctx , youtubeID )
if err != nil {
return nil , fmt . Errorf ( "error fetching video: %v" , err )
}
if len ( video . Formats ) == 0 {
return nil , errors . New ( "no format available" )
}
audioMetadata , err := s . fetchAudioMetadata ( ctx , video )
if err != nil {
return nil , fmt . Errorf ( "error fetching audio metadata: %v" , err )
}
videoMetadata , err := s . fetchVideoMetadata ( ctx , video )
if err != nil {
return nil , fmt . Errorf ( "error fetching video metadata: %v" , err )
}
2021-11-20 18:29:34 +00:00
storeParams := store . CreateMediaSetParams {
2021-11-01 05:28:40 +00:00
YoutubeID : youtubeID ,
2022-01-13 19:05:09 +00:00
Title : strings . TrimSpace ( video . Title ) ,
Description : strings . TrimSpace ( video . Description ) ,
Author : strings . TrimSpace ( video . Author ) ,
2021-11-01 05:28:40 +00:00
AudioYoutubeItag : int32 ( audioMetadata . YoutubeItag ) ,
AudioChannels : int32 ( audioMetadata . Channels ) ,
AudioFramesApprox : audioMetadata . ApproxFrames ,
2021-11-02 16:20:47 +00:00
AudioSampleRate : int32 ( audioMetadata . SampleRate ) ,
2021-11-29 13:59:05 +00:00
AudioEncodedMimeType : audioMetadata . MimeType ,
2021-11-29 11:46:33 +00:00
AudioContentLength : audioMetadata . ContentLength ,
2021-11-01 05:28:40 +00:00
VideoYoutubeItag : int32 ( videoMetadata . YoutubeItag ) ,
2021-11-29 11:46:33 +00:00
VideoContentLength : videoMetadata . ContentLength ,
2021-11-01 05:28:40 +00:00
VideoMimeType : videoMetadata . MimeType ,
VideoDurationNanos : videoMetadata . Duration . Nanoseconds ( ) ,
}
2021-11-20 18:29:34 +00:00
mediaSet , err := s . store . CreateMediaSet ( ctx , storeParams )
2021-11-01 05:28:40 +00:00
if err != nil {
return nil , fmt . Errorf ( "error creating media set in store: %v" , err )
}
return & MediaSet {
2022-01-13 19:05:09 +00:00
ID : mediaSet . ID ,
YoutubeID : youtubeID ,
Title : mediaSet . Title ,
Description : mediaSet . Description ,
Author : mediaSet . Author ,
Audio : audioMetadata ,
Video : videoMetadata ,
2021-11-01 05:28:40 +00:00
} , nil
}
// findMediaSet fetches a record from the database, returning (nil, nil) if it does not exist.
func ( s * MediaSetService ) findMediaSet ( ctx context . Context , youtubeID string ) ( * MediaSet , error ) {
mediaSet , err := s . store . GetMediaSetByYoutubeID ( ctx , youtubeID )
if err != nil {
2021-11-25 19:35:51 +00:00
if err == pgx . ErrNoRows {
2021-11-01 05:28:40 +00:00
return nil , nil
}
2021-11-25 19:35:51 +00:00
return nil , fmt . Errorf ( "error getting media set: %v" , err )
2021-11-01 05:28:40 +00:00
}
return & MediaSet {
2022-01-13 19:05:09 +00:00
ID : mediaSet . ID ,
YoutubeID : mediaSet . YoutubeID ,
Title : mediaSet . Title ,
Description : mediaSet . Description ,
Author : mediaSet . Author ,
2021-11-01 05:28:40 +00:00
Audio : Audio {
2021-11-29 11:46:33 +00:00
YoutubeItag : int ( mediaSet . AudioYoutubeItag ) ,
ContentLength : mediaSet . AudioContentLength ,
Channels : int ( mediaSet . AudioChannels ) ,
ApproxFrames : int64 ( mediaSet . AudioFramesApprox ) ,
Frames : mediaSet . AudioFrames . Int64 ,
SampleRate : int ( mediaSet . AudioSampleRate ) ,
2021-11-29 13:59:05 +00:00
MimeType : mediaSet . AudioEncodedMimeType ,
2021-11-01 05:28:40 +00:00
} ,
Video : Video {
YoutubeItag : int ( mediaSet . VideoYoutubeItag ) ,
2021-11-29 11:46:33 +00:00
ContentLength : mediaSet . VideoContentLength ,
2021-11-01 05:28:40 +00:00
Duration : time . Duration ( mediaSet . VideoDurationNanos ) ,
2021-11-01 19:33:45 +00:00
MimeType : mediaSet . VideoMimeType ,
2021-11-01 05:28:40 +00:00
ThumbnailWidth : 0 , // ??
ThumbnailHeight : 0 , // ??
} ,
} , nil
}
func ( s * MediaSetService ) fetchVideoMetadata ( ctx context . Context , video * youtubev2 . Video ) ( Video , error ) {
formats := FilterYoutubeVideo ( video . Formats )
if len ( video . Formats ) == 0 {
return Video { } , errors . New ( "no format available" )
}
format := formats [ 0 ]
durationMsecs , err := strconv . Atoi ( format . ApproxDurationMs )
if err != nil {
return Video { } , fmt . Errorf ( "could not parse video duration: %s" , err )
}
return Video {
YoutubeItag : format . ItagNo ,
MimeType : format . MimeType ,
2021-11-29 11:46:33 +00:00
ContentLength : format . ContentLength ,
2021-11-01 05:28:40 +00:00
ThumbnailWidth : thumbnailWidth ,
ThumbnailHeight : thumbnailHeight ,
Duration : time . Duration ( durationMsecs ) * time . Millisecond ,
} , nil
}
func ( s * MediaSetService ) fetchAudioMetadata ( ctx context . Context , video * youtubev2 . Video ) ( Audio , error ) {
formats := FilterYoutubeAudio ( video . Formats )
if len ( video . Formats ) == 0 {
return Audio { } , errors . New ( "no format available" )
}
format := formats [ 0 ]
sampleRate , err := strconv . Atoi ( format . AudioSampleRate )
if err != nil {
return Audio { } , fmt . Errorf ( "invalid samplerate: %s" , format . AudioSampleRate )
}
approxDurationMsecs , err := strconv . Atoi ( format . ApproxDurationMs )
if err != nil {
return Audio { } , fmt . Errorf ( "could not parse audio duration: %s" , err )
}
approxDuration := time . Duration ( approxDurationMsecs ) * time . Millisecond
approxFrames := int64 ( approxDuration / time . Second ) * int64 ( sampleRate )
return Audio {
2021-11-29 11:46:33 +00:00
ContentLength : format . ContentLength ,
MimeType : format . MimeType ,
YoutubeItag : format . ItagNo ,
ApproxFrames : approxFrames ,
Channels : format . AudioChannels ,
SampleRate : sampleRate ,
2021-11-01 05:28:40 +00:00
} , nil
}
2021-11-20 18:29:34 +00:00
// GetVideo fetches the video part of a MediaSet.
func ( s * MediaSetService ) GetVideo ( ctx context . Context , id uuid . UUID ) ( GetVideoProgressReader , error ) {
mediaSet , err := s . store . GetMediaSet ( ctx , id )
if err != nil {
return nil , fmt . Errorf ( "error getting media set: %v" , err )
}
if mediaSet . VideoS3UploadedAt . Valid {
2022-01-03 19:59:59 +00:00
var url string
url , err = s . fileStore . GetURL ( ctx , mediaSet . VideoS3Key . String )
2021-12-07 19:58:11 +00:00
if err != nil {
return nil , fmt . Errorf ( "error generating presigned URL: %v" , err )
2021-11-20 18:29:34 +00:00
}
2022-01-03 19:59:59 +00:00
videoGetter := videoGetterFromFileStore ( url )
2021-11-20 18:29:34 +00:00
return & videoGetter , nil
}
video , err := s . youtube . GetVideoContext ( ctx , mediaSet . YoutubeID )
if err != nil {
return nil , fmt . Errorf ( "error fetching video: %v" , err )
}
format := video . Formats . FindByItag ( int ( mediaSet . VideoYoutubeItag ) )
if format == nil {
return nil , fmt . Errorf ( "error finding itag: %v" , err )
}
stream , _ , err := s . youtube . GetStreamContext ( ctx , video , format )
if err != nil {
return nil , fmt . Errorf ( "error fetching stream: %v" , err )
}
2021-12-07 19:58:11 +00:00
// TODO: use mediaSet func to fetch videoKey
videoKey := fmt . Sprintf ( "media_sets/%s/video.mp4" , mediaSet . ID )
2021-11-29 15:06:43 +00:00
2021-12-07 19:58:11 +00:00
videoGetter := newVideoGetter ( s . store , s . fileStore , s . logger )
2021-11-20 18:29:34 +00:00
return videoGetter . GetVideo (
ctx ,
stream ,
format . ContentLength ,
mediaSet . ID ,
2021-12-07 19:58:11 +00:00
videoKey ,
2021-11-20 18:29:34 +00:00
format . MimeType ,
)
}
2021-12-29 15:38:25 +00:00
// GetPeaks fetches the audio part of a MediaSet.
2021-12-17 16:30:53 +00:00
func ( s * MediaSetService ) GetPeaks ( ctx context . Context , id uuid . UUID , numBins int ) ( GetPeaksProgressReader , error ) {
2021-11-01 05:28:40 +00:00
mediaSet , err := s . store . GetMediaSet ( ctx , id )
if err != nil {
return nil , fmt . Errorf ( "error getting media set: %v" , err )
}
2021-11-29 14:55:11 +00:00
// We need both raw and encoded audio to have been uploaded successfully.
// Otherwise, we cannot return both peaks and a presigned URL for use by the
// player.
if mediaSet . AudioRawS3UploadedAt . Valid && mediaSet . AudioEncodedS3UploadedAt . Valid {
2021-12-17 16:30:53 +00:00
return s . getPeaksFromFileStore ( ctx , mediaSet , numBins )
2021-11-02 18:03:26 +00:00
}
2021-12-17 16:30:53 +00:00
// Fetch the audio from Youtube, calculate and store the peaks and return
// them.
2021-11-02 18:03:26 +00:00
return s . getAudioFromYoutube ( ctx , mediaSet , numBins )
}
2021-12-17 16:30:53 +00:00
func ( s * MediaSetService ) getAudioFromYoutube ( ctx context . Context , mediaSet store . MediaSet , numBins int ) ( GetPeaksProgressReader , error ) {
2022-01-05 18:49:21 +00:00
audioGetter := newAudioGetter ( s . store , s . youtube , s . fileStore , s . commandFunc , s . workerPool , s . config , s . logger )
2021-11-29 11:46:33 +00:00
return audioGetter . GetAudio ( ctx , mediaSet , numBins )
}
2021-12-17 16:30:53 +00:00
func ( s * MediaSetService ) getPeaksFromFileStore ( ctx context . Context , mediaSet store . MediaSet , numBins int ) ( GetPeaksProgressReader , error ) {
2021-12-07 19:58:11 +00:00
object , err := s . fileStore . GetObject ( ctx , mediaSet . AudioRawS3Key . String )
2021-11-02 18:03:26 +00:00
if err != nil {
2021-12-07 19:58:11 +00:00
return nil , fmt . Errorf ( "error getting object from file store: %v" , err )
2021-11-02 18:03:26 +00:00
}
2021-12-17 16:30:53 +00:00
getPeaksProgressReader , err := newGetPeaksProgressReader (
2021-11-02 18:03:26 +00:00
int64 ( mediaSet . AudioFrames . Int64 ) ,
int ( mediaSet . AudioChannels ) ,
numBins ,
)
2021-11-20 18:29:34 +00:00
if err != nil {
return nil , fmt . Errorf ( "error creating audio reader: %v" , err )
}
2021-11-02 18:03:26 +00:00
2021-12-17 16:30:53 +00:00
state := getPeaksFromFileStoreState {
getPeaksProgressReader : getPeaksProgressReader ,
2021-12-17 16:52:59 +00:00
reader : NewModuloReader ( object , int ( mediaSet . AudioChannels ) * SizeOfInt16 ) ,
2021-12-07 19:58:11 +00:00
fileStore : s . fileStore ,
2021-11-29 14:55:11 +00:00
config : s . config ,
2021-11-22 20:35:51 +00:00
logger : s . logger ,
2021-11-02 18:03:26 +00:00
}
2021-11-29 14:55:11 +00:00
go state . run ( ctx , mediaSet )
2021-11-02 18:03:26 +00:00
return & state , nil
}
2021-12-17 16:30:53 +00:00
type getPeaksFromFileStoreState struct {
* getPeaksProgressReader
2021-11-02 18:03:26 +00:00
2021-12-07 19:58:11 +00:00
reader io . ReadCloser
fileStore FileStore
config config . Config
logger * zap . SugaredLogger
2021-11-02 18:03:26 +00:00
}
2021-12-17 16:30:53 +00:00
func ( s * getPeaksFromFileStoreState ) run ( ctx context . Context , mediaSet store . MediaSet ) {
2021-11-02 18:03:26 +00:00
done := make ( chan error )
var err error
go func ( ) {
2021-12-07 19:58:11 +00:00
_ , copyErr := io . Copy ( s , s . reader )
2021-11-02 18:03:26 +00:00
done <- copyErr
} ( )
outer :
for {
select {
case <- ctx . Done ( ) :
err = ctx . Err ( )
break outer
case err = <- done :
break outer
}
}
2021-12-07 19:58:11 +00:00
if readerErr := s . reader . Close ( ) ; readerErr != nil {
2021-11-02 18:03:26 +00:00
if err == nil {
err = readerErr
}
}
if err != nil {
2021-12-07 19:58:11 +00:00
s . logger . Errorf ( "getAudioFromFileStoreState: error closing reader: %v" , err )
2021-11-29 11:46:33 +00:00
s . CloseWithError ( err )
2021-11-02 18:03:26 +00:00
return
}
2021-12-07 19:58:11 +00:00
url , err := s . fileStore . GetURL ( ctx , mediaSet . AudioEncodedS3Key . String )
2021-11-29 14:55:11 +00:00
if err != nil {
2021-12-07 19:58:11 +00:00
s . CloseWithError ( fmt . Errorf ( "error generating object URL: %v" , err ) )
2021-11-29 14:55:11 +00:00
}
2021-12-07 19:58:11 +00:00
if iterErr := s . Close ( url ) ; iterErr != nil {
s . logger . Errorf ( "getAudioFromFileStoreState: error closing progress iterator: %v" , iterErr )
2021-11-02 18:03:26 +00:00
}
}
2021-12-17 16:30:53 +00:00
func ( s * MediaSetService ) GetPeaksForSegment ( ctx context . Context , id uuid . UUID , startFrame , endFrame int64 , numBins int ) ( [ ] int16 , error ) {
2022-02-04 15:22:06 +00:00
if startFrame < 0 || endFrame < 0 || numBins <= 0 || startFrame == endFrame {
2021-11-30 19:41:34 +00:00
s . logger . With ( "startFrame" , startFrame , "endFrame" , endFrame , "numBins" , numBins ) . Error ( "invalid arguments" )
return nil , errors . New ( "invalid arguments" )
}
2021-11-16 06:48:30 +00:00
mediaSet , err := s . store . GetMediaSet ( ctx , id )
if err != nil {
return nil , fmt . Errorf ( "error getting media set: %v" , err )
}
2021-12-07 19:58:11 +00:00
object , err := s . fileStore . GetObjectWithRange (
ctx ,
mediaSet . AudioRawS3Key . String ,
2021-11-16 06:48:30 +00:00
startFrame * int64 ( mediaSet . AudioChannels ) * SizeOfInt16 ,
endFrame * int64 ( mediaSet . AudioChannels ) * SizeOfInt16 ,
)
if err != nil {
2021-12-07 19:58:11 +00:00
return nil , fmt . Errorf ( "error getting object from file store: %v" , err )
2021-11-16 06:48:30 +00:00
}
2021-12-07 19:58:11 +00:00
defer object . Close ( )
2021-11-16 06:48:30 +00:00
2021-11-17 17:53:27 +00:00
const readBufSizeBytes = 8_192
2021-11-16 06:48:30 +00:00
channels := int ( mediaSet . AudioChannels )
2021-12-17 16:52:59 +00:00
modReader := NewModuloReader ( object , channels * SizeOfInt16 )
2021-11-17 17:53:27 +00:00
readBuf := make ( [ ] byte , readBufSizeBytes )
peaks := make ( [ ] int16 , channels * numBins )
2021-11-16 06:48:30 +00:00
totalFrames := endFrame - startFrame
framesPerBin := totalFrames / int64 ( numBins )
2021-11-17 17:53:27 +00:00
sampleBuf := make ( [ ] int16 , readBufSizeBytes / SizeOfInt16 )
bytesExpected := ( endFrame - startFrame ) * int64 ( channels ) * SizeOfInt16
2021-11-16 06:48:30 +00:00
2022-01-07 12:31:52 +00:00
var bytesRead int64
var closing bool
2021-11-16 06:48:30 +00:00
2022-01-07 12:31:52 +00:00
for bin := 0 ; bin < numBins ; bin ++ {
framesRemaining := framesPerBin
if bin == numBins - 1 {
framesRemaining += totalFrames % int64 ( numBins )
2021-11-16 06:48:30 +00:00
}
2022-01-07 12:31:52 +00:00
for {
// Read as many bytes as possible, but not exceeding the available buffer
// size nor framesRemaining:
bytesToRead := framesRemaining * int64 ( channels ) * SizeOfInt16
max := int64 ( len ( readBuf ) )
if bytesToRead > max {
bytesToRead = max
}
2021-11-17 17:53:27 +00:00
2022-01-07 12:31:52 +00:00
n , err := modReader . Read ( readBuf [ : bytesToRead ] )
if err == io . EOF {
closing = true
} else if err != nil {
return nil , fmt . Errorf ( "read error: %v" , err )
}
2021-11-16 06:48:30 +00:00
2022-01-07 12:31:52 +00:00
ss := sampleBuf [ : n / SizeOfInt16 ]
if err := binary . Read ( bytes . NewReader ( readBuf [ : n ] ) , binary . LittleEndian , ss ) ; err != nil {
return nil , fmt . Errorf ( "error interpreting samples: %v" , err )
}
p i := bin * channels
for i := 0 ; i < len ( ss ) ; i += channels {
for j := 0 ; j < channels ; j ++ {
s := ss [ i + j ]
if s < 0 {
s = - s
}
if s > peaks [ pi + j ] {
peaks [ pi + j ] = s
}
2021-11-16 06:48:30 +00:00
}
}
2021-11-17 17:53:27 +00:00
2022-01-07 12:31:52 +00:00
framesRemaining -= int64 ( n ) / int64 ( channels ) / SizeOfInt16
bytesRead += int64 ( n )
if closing || framesRemaining == 0 {
break
2021-11-16 06:48:30 +00:00
}
}
2021-11-17 17:53:27 +00:00
if closing {
break
}
}
if bytesRead < bytesExpected {
2021-12-07 19:58:11 +00:00
s . logger . With ( "startFrame" , startFrame , "endFrame" , endFrame , "got" , bytesRead , "want" , bytesExpected , "key" , mediaSet . AudioRawS3Key . String ) . Warn ( "short read from file store" )
2021-11-16 06:48:30 +00:00
}
return peaks , nil
}
2022-01-10 17:52:04 +00:00
func ( s * MediaSetService ) GetAudioSegment ( ctx context . Context , id uuid . UUID , startFrame , endFrame int64 , outFormat AudioFormat ) ( AudioSegmentStream , error ) {
2021-12-29 15:38:25 +00:00
if startFrame > endFrame {
return nil , errors . New ( "invalid range" )
2021-11-21 19:43:40 +00:00
}
2021-12-29 15:38:25 +00:00
mediaSet , err := s . store . GetMediaSet ( ctx , id )
2021-11-21 19:43:40 +00:00
if err != nil {
2021-12-29 15:38:25 +00:00
return nil , fmt . Errorf ( "error getting media set: %v" , err )
2021-11-21 19:43:40 +00:00
}
2021-12-07 19:58:11 +00:00
// TODO: use mediaSet func to fetch key
2021-12-29 15:38:25 +00:00
key := fmt . Sprintf ( "media_sets/%s/audio.raw" , mediaSet . ID )
startByte := startFrame * int64 ( mediaSet . AudioChannels ) * SizeOfInt16
endByte := endFrame * int64 ( mediaSet . AudioChannels ) * SizeOfInt16
2021-11-21 19:43:40 +00:00
2021-12-29 15:38:25 +00:00
rawAudio , err := s . fileStore . GetObjectWithRange ( ctx , key , startByte , endByte )
2021-11-21 19:43:40 +00:00
if err != nil {
2021-12-29 15:38:25 +00:00
return nil , fmt . Errorf ( "error getting object from store: %v" , err )
2021-11-21 19:43:40 +00:00
}
2022-01-05 18:49:21 +00:00
g := newAudioSegmentGetter ( s . commandFunc , s . workerPool , rawAudio , mediaSet . AudioChannels , endByte - startByte , outFormat )
2021-12-29 15:38:25 +00:00
go g . getAudioSegment ( ctx )
2021-11-21 19:43:40 +00:00
2021-12-29 15:38:25 +00:00
return g . stream , nil
2021-11-21 19:43:40 +00:00
}
2021-11-29 15:06:43 +00:00
2021-12-17 16:30:53 +00:00
// logProgressReader is a reader that prints progress logs as it reads.
type logProgressReader struct {
2021-11-29 15:06:43 +00:00
io . Reader
total , exp int64
logger * zap . SugaredLogger
}
2021-12-17 16:30:53 +00:00
func newLogProgressReader ( reader io . Reader , label string , exp int64 , logger * zap . SugaredLogger ) * logProgressReader {
return & logProgressReader {
2021-11-29 15:06:43 +00:00
Reader : reader ,
exp : exp ,
2021-12-07 19:58:11 +00:00
logger : logger . Named ( label ) ,
2021-11-29 15:06:43 +00:00
}
}
2021-12-17 16:30:53 +00:00
func ( r * logProgressReader ) Read ( p [ ] byte ) ( int , error ) {
2021-11-29 15:06:43 +00:00
n , err := r . Reader . Read ( p )
r . total += int64 ( n )
r . logger . Debugf ( "Read %d of %d (%.02f%%) bytes from the provided reader" , r . total , r . exp , ( float32 ( r . total ) / float32 ( r . exp ) ) * 100.0 )
return n , err
}
2021-12-29 15:38:25 +00:00
func sqlString ( s string ) sql . NullString {
return sql . NullString { String : s , Valid : true }
}
func sqlInt64 ( i int64 ) sql . NullInt64 {
return sql . NullInt64 { Int64 : i , Valid : true }
}
func sqlInt32 ( i int32 ) sql . NullInt32 {
return sql . NullInt32 { Int32 : i , Valid : true }
}