diff --git a/backend/media/ffmpeg_reader.go b/backend/media/ffmpeg_reader.go index 71ad10b..b21935e 100644 --- a/backend/media/ffmpeg_reader.go +++ b/backend/media/ffmpeg_reader.go @@ -11,7 +11,8 @@ import ( type ffmpegReader struct { io.ReadCloser - cmd *exec.Cmd + cmd *exec.Cmd + stdErrBuf *bytes.Buffer } func newFfmpegReader(ctx context.Context, input io.Reader, arg ...string) (*ffmpegReader, error) { @@ -30,7 +31,7 @@ func newFfmpegReader(ctx context.Context, input io.Reader, arg ...string) (*ffmp return nil, fmt.Errorf("error starting ffmpeg: %v, output: %s", err, stdErr.String()) } - return &ffmpegReader{ReadCloser: r, cmd: cmd}, nil + return &ffmpegReader{ReadCloser: r, cmd: cmd, stdErrBuf: &stdErr}, nil } func (r *ffmpegReader) Cancel() error { @@ -48,7 +49,7 @@ func (r *ffmpegReader) Close() error { } if state.ExitCode() != 0 { - return fmt.Errorf("non-zero status %d returned from ffmpeg process", state.ExitCode()) + return fmt.Errorf("non-zero status %d returned from ffmpeg process, output: %s", state.ExitCode(), r.stdErrBuf.String()) } return nil diff --git a/backend/media/service.go b/backend/media/service.go index 4a33352..9a32016 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "io" - "log" "net/http" "strconv" "time" @@ -42,15 +41,25 @@ const ( // progressReader is a reader that prints progress logs as it reads. type progressReader struct { io.Reader + label string total, exp int64 + logger *zap.SugaredLogger } -func (pw *progressReader) Read(p []byte) (int, error) { - n, err := pw.Reader.Read(p) - pw.total += int64(n) +func newProgressReader(reader io.Reader, label string, exp int64, logger *zap.SugaredLogger) *progressReader { + return &progressReader{ + Reader: reader, + exp: exp, + logger: logger.Named(fmt.Sprintf("ProgressReader %s", label)), + } +} - log.Printf("[ProgressReader] [%s] Read %d of %d (%.02f%%) bytes from the provided reader", pw.label, pw.total, pw.exp, (float32(pw.total)/float32(pw.exp))*100.0) +func (r *progressReader) Read(p []byte) (int, error) { + n, err := r.Reader.Read(p) + r.total += int64(n) + + r.logger.Debugf("Read %d of %d (%.02f%%) bytes from the provided reader", r.total, r.exp, (float32(r.total)/float32(r.exp))*100.0) return n, err } @@ -348,6 +357,7 @@ func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.Med state := getAudioFromS3State{ getAudioProgressReader: getAudioProgressReader, s3Reader: NewModuloBufReader(output.Body, int(mediaSet.AudioChannels)*SizeOfInt16), + logger: s.logger, } go state.run(ctx) @@ -358,6 +368,7 @@ type getAudioFromS3State struct { *getAudioProgressReader s3Reader io.ReadCloser + logger *zap.SugaredLogger } func (s *getAudioFromS3State) run(ctx context.Context) { @@ -387,13 +398,13 @@ outer: } if err != nil { - log.Printf("error closing s3Reader: %v", err) + s.logger.Errorf("getAudioFromS3State: error closing s3 reader: %v", err) s.Abort(err) return } if iterErr := s.Close(); iterErr != nil { - log.Printf("error closing progress iterator: %v", iterErr) + s.logger.Errorf("getAudioFromS3State: error closing progress iterator: %v", iterErr) } } @@ -413,16 +424,16 @@ func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet stor return nil, fmt.Errorf("error fetching stream: %v", err) } - streamWithProgress := &progressReader{Reader: stream, label: "audio", exp: format.ContentLength} + streamWithProgress := newProgressReader(stream, "audio", format.ContentLength, s.logger) - ffmpegReader, err := newFfmpegReader(ctx, streamWithProgress, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") + ffmpegReader, err := newFfmpegReader(ctx, streamWithProgress, "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") if err != nil { return nil, fmt.Errorf("error creating ffmpegreader: %v", err) } // TODO: use mediaSet func to fetch s3Key s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) - uploader := newMultipartUploader(s.s3) + uploader := newMultipartUploader(s.s3, s.logger) getAudioProgressReader, err := newGetAudioProgressReader( int64(mediaSet.AudioFramesApprox), @@ -441,6 +452,7 @@ func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet stor s3Key: s3Key, store: s.store, channels: format.AudioChannels, + logger: s.logger, } go state.run(ctx, mediaSet.ID) @@ -455,6 +467,7 @@ type getAudioFromYoutubeState struct { s3Bucket, s3Key string store Store channels int + logger *zap.SugaredLogger } func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID) { @@ -465,7 +478,7 @@ func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID // still be active. Kill it. if err != nil { if cancelErr := s.ffmpegReader.Cancel(); cancelErr != nil { - log.Printf("error cancelling ffmpegreader: %v", cancelErr) + s.logger.Errorf("getAudioFromYoutubeState: error cancelling ffmpegreader: %v", cancelErr) } } @@ -491,14 +504,14 @@ func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID } if err != nil { - log.Printf("error uploading asynchronously: %v", err) + s.logger.Errorf("getAudioFromYoutubeState: error uploading asynchronously: %v", err) s.Abort(err) return } if iterErr := s.Close(); iterErr != nil { - log.Printf("error closing progress iterator: %v", iterErr) + s.logger.Errorf("getAudioFromYoutubeState: error closing progress iterator: %v", iterErr) } } @@ -581,7 +594,7 @@ func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, sta } if bytesRead < bytesExpected { - s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioS3Key.String).Info("short read from S3") + s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioS3Key.String).Warn("short read from S3") } return peaks, nil @@ -705,7 +718,7 @@ func (s *MediaSetService) getThumbnailFromYoutube(ctx context.Context, mediaSet // TODO: use mediaSet func to fetch s3Key s3Key := fmt.Sprintf("media_sets/%s/thumbnail.jpg", mediaSet.ID) - uploader := newMultipartUploader(s.s3) + uploader := newMultipartUploader(s.s3, s.logger) const mimeType = "application/jpeg" _, err = uploader.Upload(ctx, bytes.NewReader(imageData), s.config.S3Bucket, s3Key, mimeType) diff --git a/backend/media/uploader.go b/backend/media/uploader.go index 6bede6b..b80ca5e 100644 --- a/backend/media/uploader.go +++ b/backend/media/uploader.go @@ -6,20 +6,21 @@ import ( "errors" "fmt" "io" - "log" "sort" "sync" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" + "go.uber.org/zap" ) // multipartUploader uploads a file to S3. // // TODO: extract to s3 package type multipartUploader struct { - s3 S3Client + s3 S3Client + logger *zap.SugaredLogger } type uploadResult struct { @@ -32,8 +33,8 @@ const ( readBufferSizeBytes = 32_768 // 32Kb ) -func newMultipartUploader(s3Client S3Client) *multipartUploader { - return &multipartUploader{s3: s3Client} +func newMultipartUploader(s3Client S3Client, logger *zap.SugaredLogger) *multipartUploader { + return &multipartUploader{s3: s3Client, logger: logger} } // Upload uploads to an S3 bucket in 5MB parts. It buffers data internally @@ -73,9 +74,9 @@ func (u *multipartUploader) Upload(ctx context.Context, r io.Reader, bucket, key _, abortErr := u.s3.AbortMultipartUpload(ctxToUse, &input) if abortErr != nil { - log.Printf("error aborting upload: %v", abortErr) + u.logger.Errorf("uploader: error aborting upload: %v", abortErr) } else { - log.Printf("aborted upload, key = %s", key) + u.logger.Infof("aborted upload, key = %s", key) } }() @@ -87,7 +88,7 @@ func (u *multipartUploader) Upload(ctx context.Context, r io.Reader, bucket, key defer wg.Done() partLen := int64(len(buf)) - log.Printf("uploading part num = %d, len = %d", partNum, partLen) + u.logger.With("key", key, "partNum", partNum, "partLen", partLen).Debug("uploading part") input := s3.UploadPartInput{ Body: bytes.NewReader(buf), @@ -105,7 +106,7 @@ func (u *multipartUploader) Upload(ctx context.Context, r io.Reader, bucket, key return } - log.Printf("uploaded part num = %d, etag = %s, bytes = %d", partNum, *output.ETag, partLen) + u.logger.With("key", key, "partNum", partNum, "partLen", partLen, "etag", *output.ETag).Debug("uploaded part") uploadResultChan <- uploadResult{ completedPart: types.CompletedPart{ETag: output.ETag, PartNumber: partNum}, @@ -201,11 +202,10 @@ outer: } if _, err = u.s3.CompleteMultipartUpload(ctx, &completeInput); err != nil { - log.Printf("parts: %+v", completedParts) return 0, fmt.Errorf("error completing upload: %v", err) } - log.Printf("completed upload, key = %s, bytesUploaded = %d", key, uploadedBytes) + u.logger.With("key", key, "numParts", len(completedParts), "len", uploadedBytes).Debug("completed upload") uploaded = true return uploadedBytes, nil diff --git a/backend/media/video_progress.go b/backend/media/video_progress.go index 24b65c7..c28196f 100644 --- a/backend/media/video_progress.go +++ b/backend/media/video_progress.go @@ -46,7 +46,7 @@ func newVideoGetter(s3 S3API, store Store, logger *zap.SugaredLogger) *videoGett func (g *videoGetter) GetVideo(ctx context.Context, r io.Reader, exp int64, mediaSetID uuid.UUID, bucket, key, contentType string) (GetVideoProgressReader, error) { s := &videoGetterState{ videoGetter: g, - r: &progressReader{Reader: r, label: "video", exp: exp}, + r: newProgressReader(r, "video", exp, g.logger), exp: exp, mediaSetID: mediaSetID, bucket: bucket, @@ -71,7 +71,7 @@ func (s *videoGetterState) Write(p []byte) (int, error) { } func (s *videoGetterState) getVideo(ctx context.Context) { - uploader := newMultipartUploader(s.s3) + uploader := newMultipartUploader(s.s3, s.logger) teeReader := io.TeeReader(s.r, s) _, err := uploader.Upload(ctx, teeReader, s.bucket, s.key, s.contentType)