Replace log package with zap.Logger

This commit is contained in:
Rob Watson 2021-11-22 21:35:51 +01:00
parent b643ea2824
commit 61171b00af
4 changed files with 44 additions and 30 deletions

View File

@ -11,7 +11,8 @@ import (
type ffmpegReader struct { type ffmpegReader struct {
io.ReadCloser io.ReadCloser
cmd *exec.Cmd cmd *exec.Cmd
stdErrBuf *bytes.Buffer
} }
func newFfmpegReader(ctx context.Context, input io.Reader, arg ...string) (*ffmpegReader, error) { func newFfmpegReader(ctx context.Context, input io.Reader, arg ...string) (*ffmpegReader, error) {
@ -30,7 +31,7 @@ func newFfmpegReader(ctx context.Context, input io.Reader, arg ...string) (*ffmp
return nil, fmt.Errorf("error starting ffmpeg: %v, output: %s", err, stdErr.String()) return nil, fmt.Errorf("error starting ffmpeg: %v, output: %s", err, stdErr.String())
} }
return &ffmpegReader{ReadCloser: r, cmd: cmd}, nil return &ffmpegReader{ReadCloser: r, cmd: cmd, stdErrBuf: &stdErr}, nil
} }
func (r *ffmpegReader) Cancel() error { func (r *ffmpegReader) Cancel() error {
@ -48,7 +49,7 @@ func (r *ffmpegReader) Close() error {
} }
if state.ExitCode() != 0 { if state.ExitCode() != 0 {
return fmt.Errorf("non-zero status %d returned from ffmpeg process", state.ExitCode()) return fmt.Errorf("non-zero status %d returned from ffmpeg process, output: %s", state.ExitCode(), r.stdErrBuf.String())
} }
return nil return nil

View File

@ -8,7 +8,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"log"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
@ -42,15 +41,25 @@ const (
// progressReader is a reader that prints progress logs as it reads. // progressReader is a reader that prints progress logs as it reads.
type progressReader struct { type progressReader struct {
io.Reader io.Reader
label string label string
total, exp int64 total, exp int64
logger *zap.SugaredLogger
} }
func (pw *progressReader) Read(p []byte) (int, error) { func newProgressReader(reader io.Reader, label string, exp int64, logger *zap.SugaredLogger) *progressReader {
n, err := pw.Reader.Read(p) return &progressReader{
pw.total += int64(n) Reader: reader,
exp: exp,
logger: logger.Named(fmt.Sprintf("ProgressReader %s", label)),
}
}
log.Printf("[ProgressReader] [%s] Read %d of %d (%.02f%%) bytes from the provided reader", pw.label, pw.total, pw.exp, (float32(pw.total)/float32(pw.exp))*100.0) func (r *progressReader) Read(p []byte) (int, error) {
n, err := r.Reader.Read(p)
r.total += int64(n)
r.logger.Debugf("Read %d of %d (%.02f%%) bytes from the provided reader", r.total, r.exp, (float32(r.total)/float32(r.exp))*100.0)
return n, err return n, err
} }
@ -348,6 +357,7 @@ func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.Med
state := getAudioFromS3State{ state := getAudioFromS3State{
getAudioProgressReader: getAudioProgressReader, getAudioProgressReader: getAudioProgressReader,
s3Reader: NewModuloBufReader(output.Body, int(mediaSet.AudioChannels)*SizeOfInt16), s3Reader: NewModuloBufReader(output.Body, int(mediaSet.AudioChannels)*SizeOfInt16),
logger: s.logger,
} }
go state.run(ctx) go state.run(ctx)
@ -358,6 +368,7 @@ type getAudioFromS3State struct {
*getAudioProgressReader *getAudioProgressReader
s3Reader io.ReadCloser s3Reader io.ReadCloser
logger *zap.SugaredLogger
} }
func (s *getAudioFromS3State) run(ctx context.Context) { func (s *getAudioFromS3State) run(ctx context.Context) {
@ -387,13 +398,13 @@ outer:
} }
if err != nil { if err != nil {
log.Printf("error closing s3Reader: %v", err) s.logger.Errorf("getAudioFromS3State: error closing s3 reader: %v", err)
s.Abort(err) s.Abort(err)
return return
} }
if iterErr := s.Close(); iterErr != nil { if iterErr := s.Close(); iterErr != nil {
log.Printf("error closing progress iterator: %v", iterErr) s.logger.Errorf("getAudioFromS3State: error closing progress iterator: %v", iterErr)
} }
} }
@ -413,16 +424,16 @@ func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet stor
return nil, fmt.Errorf("error fetching stream: %v", err) return nil, fmt.Errorf("error fetching stream: %v", err)
} }
streamWithProgress := &progressReader{Reader: stream, label: "audio", exp: format.ContentLength} streamWithProgress := newProgressReader(stream, "audio", format.ContentLength, s.logger)
ffmpegReader, err := newFfmpegReader(ctx, streamWithProgress, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") ffmpegReader, err := newFfmpegReader(ctx, streamWithProgress, "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-")
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating ffmpegreader: %v", err) return nil, fmt.Errorf("error creating ffmpegreader: %v", err)
} }
// TODO: use mediaSet func to fetch s3Key // TODO: use mediaSet func to fetch s3Key
s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID)
uploader := newMultipartUploader(s.s3) uploader := newMultipartUploader(s.s3, s.logger)
getAudioProgressReader, err := newGetAudioProgressReader( getAudioProgressReader, err := newGetAudioProgressReader(
int64(mediaSet.AudioFramesApprox), int64(mediaSet.AudioFramesApprox),
@ -441,6 +452,7 @@ func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet stor
s3Key: s3Key, s3Key: s3Key,
store: s.store, store: s.store,
channels: format.AudioChannels, channels: format.AudioChannels,
logger: s.logger,
} }
go state.run(ctx, mediaSet.ID) go state.run(ctx, mediaSet.ID)
@ -455,6 +467,7 @@ type getAudioFromYoutubeState struct {
s3Bucket, s3Key string s3Bucket, s3Key string
store Store store Store
channels int channels int
logger *zap.SugaredLogger
} }
func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID) { func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID) {
@ -465,7 +478,7 @@ func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID
// still be active. Kill it. // still be active. Kill it.
if err != nil { if err != nil {
if cancelErr := s.ffmpegReader.Cancel(); cancelErr != nil { if cancelErr := s.ffmpegReader.Cancel(); cancelErr != nil {
log.Printf("error cancelling ffmpegreader: %v", cancelErr) s.logger.Errorf("getAudioFromYoutubeState: error cancelling ffmpegreader: %v", cancelErr)
} }
} }
@ -491,14 +504,14 @@ func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID
} }
if err != nil { if err != nil {
log.Printf("error uploading asynchronously: %v", err) s.logger.Errorf("getAudioFromYoutubeState: error uploading asynchronously: %v", err)
s.Abort(err) s.Abort(err)
return return
} }
if iterErr := s.Close(); iterErr != nil { if iterErr := s.Close(); iterErr != nil {
log.Printf("error closing progress iterator: %v", iterErr) s.logger.Errorf("getAudioFromYoutubeState: error closing progress iterator: %v", iterErr)
} }
} }
@ -581,7 +594,7 @@ func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, sta
} }
if bytesRead < bytesExpected { if bytesRead < bytesExpected {
s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioS3Key.String).Info("short read from S3") s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioS3Key.String).Warn("short read from S3")
} }
return peaks, nil return peaks, nil
@ -705,7 +718,7 @@ func (s *MediaSetService) getThumbnailFromYoutube(ctx context.Context, mediaSet
// TODO: use mediaSet func to fetch s3Key // TODO: use mediaSet func to fetch s3Key
s3Key := fmt.Sprintf("media_sets/%s/thumbnail.jpg", mediaSet.ID) s3Key := fmt.Sprintf("media_sets/%s/thumbnail.jpg", mediaSet.ID)
uploader := newMultipartUploader(s.s3) uploader := newMultipartUploader(s.s3, s.logger)
const mimeType = "application/jpeg" const mimeType = "application/jpeg"
_, err = uploader.Upload(ctx, bytes.NewReader(imageData), s.config.S3Bucket, s3Key, mimeType) _, err = uploader.Upload(ctx, bytes.NewReader(imageData), s.config.S3Bucket, s3Key, mimeType)

View File

@ -6,20 +6,21 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"log"
"sort" "sort"
"sync" "sync"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/aws-sdk-go-v2/service/s3/types"
"go.uber.org/zap"
) )
// multipartUploader uploads a file to S3. // multipartUploader uploads a file to S3.
// //
// TODO: extract to s3 package // TODO: extract to s3 package
type multipartUploader struct { type multipartUploader struct {
s3 S3Client s3 S3Client
logger *zap.SugaredLogger
} }
type uploadResult struct { type uploadResult struct {
@ -32,8 +33,8 @@ const (
readBufferSizeBytes = 32_768 // 32Kb readBufferSizeBytes = 32_768 // 32Kb
) )
func newMultipartUploader(s3Client S3Client) *multipartUploader { func newMultipartUploader(s3Client S3Client, logger *zap.SugaredLogger) *multipartUploader {
return &multipartUploader{s3: s3Client} return &multipartUploader{s3: s3Client, logger: logger}
} }
// Upload uploads to an S3 bucket in 5MB parts. It buffers data internally // Upload uploads to an S3 bucket in 5MB parts. It buffers data internally
@ -73,9 +74,9 @@ func (u *multipartUploader) Upload(ctx context.Context, r io.Reader, bucket, key
_, abortErr := u.s3.AbortMultipartUpload(ctxToUse, &input) _, abortErr := u.s3.AbortMultipartUpload(ctxToUse, &input)
if abortErr != nil { if abortErr != nil {
log.Printf("error aborting upload: %v", abortErr) u.logger.Errorf("uploader: error aborting upload: %v", abortErr)
} else { } else {
log.Printf("aborted upload, key = %s", key) u.logger.Infof("aborted upload, key = %s", key)
} }
}() }()
@ -87,7 +88,7 @@ func (u *multipartUploader) Upload(ctx context.Context, r io.Reader, bucket, key
defer wg.Done() defer wg.Done()
partLen := int64(len(buf)) partLen := int64(len(buf))
log.Printf("uploading part num = %d, len = %d", partNum, partLen) u.logger.With("key", key, "partNum", partNum, "partLen", partLen).Debug("uploading part")
input := s3.UploadPartInput{ input := s3.UploadPartInput{
Body: bytes.NewReader(buf), Body: bytes.NewReader(buf),
@ -105,7 +106,7 @@ func (u *multipartUploader) Upload(ctx context.Context, r io.Reader, bucket, key
return return
} }
log.Printf("uploaded part num = %d, etag = %s, bytes = %d", partNum, *output.ETag, partLen) u.logger.With("key", key, "partNum", partNum, "partLen", partLen, "etag", *output.ETag).Debug("uploaded part")
uploadResultChan <- uploadResult{ uploadResultChan <- uploadResult{
completedPart: types.CompletedPart{ETag: output.ETag, PartNumber: partNum}, completedPart: types.CompletedPart{ETag: output.ETag, PartNumber: partNum},
@ -201,11 +202,10 @@ outer:
} }
if _, err = u.s3.CompleteMultipartUpload(ctx, &completeInput); err != nil { if _, err = u.s3.CompleteMultipartUpload(ctx, &completeInput); err != nil {
log.Printf("parts: %+v", completedParts)
return 0, fmt.Errorf("error completing upload: %v", err) return 0, fmt.Errorf("error completing upload: %v", err)
} }
log.Printf("completed upload, key = %s, bytesUploaded = %d", key, uploadedBytes) u.logger.With("key", key, "numParts", len(completedParts), "len", uploadedBytes).Debug("completed upload")
uploaded = true uploaded = true
return uploadedBytes, nil return uploadedBytes, nil

View File

@ -46,7 +46,7 @@ func newVideoGetter(s3 S3API, store Store, logger *zap.SugaredLogger) *videoGett
func (g *videoGetter) GetVideo(ctx context.Context, r io.Reader, exp int64, mediaSetID uuid.UUID, bucket, key, contentType string) (GetVideoProgressReader, error) { func (g *videoGetter) GetVideo(ctx context.Context, r io.Reader, exp int64, mediaSetID uuid.UUID, bucket, key, contentType string) (GetVideoProgressReader, error) {
s := &videoGetterState{ s := &videoGetterState{
videoGetter: g, videoGetter: g,
r: &progressReader{Reader: r, label: "video", exp: exp}, r: newProgressReader(r, "video", exp, g.logger),
exp: exp, exp: exp,
mediaSetID: mediaSetID, mediaSetID: mediaSetID,
bucket: bucket, bucket: bucket,
@ -71,7 +71,7 @@ func (s *videoGetterState) Write(p []byte) (int, error) {
} }
func (s *videoGetterState) getVideo(ctx context.Context) { func (s *videoGetterState) getVideo(ctx context.Context) {
uploader := newMultipartUploader(s.s3) uploader := newMultipartUploader(s.s3, s.logger)
teeReader := io.TeeReader(s.r, s) teeReader := io.TeeReader(s.r, s)
_, err := uploader.Upload(ctx, teeReader, s.bucket, s.key, s.contentType) _, err := uploader.Upload(ctx, teeReader, s.bucket, s.key, s.contentType)