clipper/backend/media/get_segment.go

178 lines
4.8 KiB
Go

package media
//go:generate mockery --recursive --name AudioSegmentStream --output ../generated/mocks
import (
"bytes"
"context"
"fmt"
"io"
"os/exec"
"strconv"
"sync"
)
// CommandFunc is a function that builds an *exec.Cmd from a context, name and
// args.
type CommandFunc func(ctx context.Context, name string, arg ...string) *exec.Cmd
// AudioFormat represents an abstract audio format, e.g. MP3 or WAV.
type AudioFormat int
const (
AudioFormatWAV AudioFormat = iota
AudioFormatMP3
)
// String implements fmt.Stringer.
func (f AudioFormat) String() string {
switch f {
case AudioFormatWAV:
return "wav"
case AudioFormatMP3:
return "mp3"
default:
panic("unknown audio format")
}
}
// AudioSegmentProgress represents a progress update for an AudioSegmentStream,
// and contains a byte slice of audio data and indication of the approximate
// progress.
type AudioSegmentProgress struct {
PercentComplete float32
Data []byte
}
// AudioSegmentStream implements stream of AudioSegmentProgress structs. The
// Next() method must be called until it returns io.EOF to avoid resource
// leakage.
type AudioSegmentStream interface {
Next(ctx context.Context) (AudioSegmentProgress, error)
}
// audioSegmentStream implements AudioSegmentStream.
type audioSegmentStream struct {
progressChan chan AudioSegmentProgress
errorChan chan error
}
// send publishes a new partial segment and progress update to the strean.
func (s *audioSegmentStream) send(p []byte, percentComplete float32) {
s.progressChan <- AudioSegmentProgress{
Data: p,
PercentComplete: percentComplete,
}
}
// close signals the successful end of the stream of data.
func (s *audioSegmentStream) close() {
close(s.progressChan)
}
// closeWithError signals the unsuccessful end of a stream of data.
func (s *audioSegmentStream) closeWithError(err error) {
s.errorChan <- err
}
// audioSegmentGetter gets an audio segment and streams it to the caller.
type audioSegmentGetter struct {
mu sync.Mutex
commandFunc CommandFunc
workerPool *WorkerPool
rawAudio io.ReadCloser
channels int32
outFormat AudioFormat
stream *audioSegmentStream
bytesRead, bytesExpected int64
}
// newAudioSegmentGetter returns a new audioSegmentGetter. The io.ReadCloser
// will be consumed and closed by the getAudioSegment() function.
func newAudioSegmentGetter(commandFunc CommandFunc, workerPool *WorkerPool, rawAudio io.ReadCloser, channels int32, bytesExpected int64, outFormat AudioFormat) *audioSegmentGetter {
return &audioSegmentGetter{
commandFunc: commandFunc,
workerPool: workerPool,
rawAudio: rawAudio,
channels: channels,
bytesExpected: bytesExpected,
outFormat: outFormat,
stream: &audioSegmentStream{
progressChan: make(chan AudioSegmentProgress),
errorChan: make(chan error, 1),
},
}
}
// Read implements io.Reader and is consumed by the stdin of the FFMPEG
// command. It is called from a separate goroutine to Write().
func (s *audioSegmentGetter) Read(p []byte) (int, error) {
n, err := s.rawAudio.Read(p)
s.mu.Lock()
defer s.mu.Unlock()
s.bytesRead += int64(n)
return n, err
}
// Write implements io.Writer and consumes the stdout of the FFMPEG command. It
// is called from a separate goroutine to Read().
func (s *audioSegmentGetter) Write(p []byte) (int, error) {
s.stream.send(p, s.percentComplete())
return len(p), nil
}
func (s *audioSegmentGetter) percentComplete() float32 {
s.mu.Lock()
defer s.mu.Unlock()
return (float32(s.bytesRead) / float32(s.bytesExpected)) * 100
}
// Next implements AudioSegmentStream.
func (s *audioSegmentStream) Next(ctx context.Context) (AudioSegmentProgress, error) {
select {
case progress, ok := <-s.progressChan:
if !ok {
return AudioSegmentProgress{}, io.EOF
}
return progress, nil
case err := <-s.errorChan:
return AudioSegmentProgress{}, err
case <-ctx.Done():
return AudioSegmentProgress{}, ctx.Err()
}
}
func (s *audioSegmentGetter) getAudioSegment(ctx context.Context) {
defer s.rawAudio.Close()
err := s.workerPool.WaitForTask(ctx, func() error {
var stdErr bytes.Buffer
cmd := s.commandFunc(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-f", "s16le", "-ac", itoa(int(s.channels)), "-ar", itoa(rawAudioSampleRate), "-i", "-", "-f", s.outFormat.String(), "-")
cmd.Stderr = &stdErr
cmd.Stdin = s
cmd.Stdout = s
if err := cmd.Start(); err != nil {
return fmt.Errorf("error starting command: %v, output: %s", err, stdErr.String())
}
if err := cmd.Wait(); err != nil {
return fmt.Errorf("error waiting for ffmpeg: %v, output: %s", err, stdErr.String())
}
return nil
})
if err != nil {
s.stream.closeWithError(err)
return
}
s.stream.close()
}
func itoa(i int) string { return strconv.Itoa(i) }