segmento/pkg/generator/ffmpeg.go

153 lines
3.6 KiB
Go

package generator
import (
"fmt"
"io"
"io/ioutil"
"log"
"os"
"os/exec"
"path/filepath"
"segmenta/pkg/handler"
"strings"
"sync"
"gopkg.in/fsnotify.v1"
)
const AppName = "segmenta"
const DefaultPlaylistFilename = "playlist.m3u8"
// Generator generates assets for a given stream. The default implementation
// segments using FFMPEG.
type FFMPEGGenerator struct {
name string
src io.Reader
handlers map[handler.Handler]bool
}
func NewFFMPEGGenerator(name string, src io.Reader) *FFMPEGGenerator {
return &FFMPEGGenerator{
name: name,
src: src,
handlers: make(map[handler.Handler]bool),
}
}
func (g *FFMPEGGenerator) Generate() error {
// first, create a temporary directory.
prefix := fmt.Sprintf("%s-%s", AppName, g.name)
tmpdir, err := ioutil.TempDir("", prefix)
if err != nil {
return fmt.Errorf("ffmpeg: error initializing TempDir: %v", tmpdir)
}
defer os.RemoveAll(tmpdir)
playlist := filepath.Join(tmpdir, DefaultPlaylistFilename)
// FIXME: Better concurrency management is needed here. We should expect doMMPEG to run
// indefinitely, and its eventual closure (successful or otherwise) to trigger closure
// of this main process, including other goroutines.
// one instance of doWatch should be running at all times until the FFMPEG goroutine is
// closed. If it is interrupted for some reason before doFFMPEG has terminated, then
// this is a bug. It should be restarted.
var wg sync.WaitGroup
go g.doFFMPEG(&wg, playlist)
go g.doWatch(&wg, tmpdir)
wg.Add(2)
wg.Wait()
return nil
}
func (g *FFMPEGGenerator) AddHandler(h handler.Handler) {
g.handlers[h] = true
}
func (g *FFMPEGGenerator) RemoveHandler(h handler.Handler) {
delete(g.handlers, h)
}
// TODO should wg be passed in a Context?
func (g *FFMPEGGenerator) doFFMPEG(wg *sync.WaitGroup, dest string) {
defer wg.Done()
cmd := exec.Command("ffmpeg", "-re", "-i", "-", "-f", "hls", "-hls_flags", "delete_segments", dest)
cmd.Stdin = g.src
if err := cmd.Run(); err != nil {
// TODO: error handling
log.Fatalf("ffmpeg: error running cmd: %v", err)
}
fmt.Println("ffmpeg: exited")
}
func (g *FFMPEGGenerator) doWatch(wg *sync.WaitGroup, srcDir string) {
defer wg.Done()
watcher, err := fsnotify.NewWatcher()
if err != nil {
// TODO: error handling
log.Fatal(err)
}
defer watcher.Close()
err = watcher.Add(srcDir)
if err != nil {
log.Fatal(err)
}
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
key := filepath.Join(g.name, filepath.Base(event.Name))
if event.Op&fsnotify.Create == fsnotify.Create && strings.HasSuffix(event.Name, ".m3u8") {
asset := handler.Asset{
Key: key,
Type: handler.MediaPlaylist,
ContentType: "vnd.apple.mpegURL",
Path: event.Name,
}
for h, _ := range g.handlers {
h.AssetAdded(asset)
}
}
if event.Op&fsnotify.Write == fsnotify.Write && strings.HasSuffix(event.Name, ".ts") {
asset := handler.Asset{
Key: key,
Type: handler.Segment,
ContentType: "video/MP2T",
Path: event.Name,
}
for h, _ := range g.handlers {
h.AssetAdded(asset)
}
}
if event.Op&fsnotify.Remove == fsnotify.Remove && strings.HasSuffix(event.Name, ".ts") {
asset := handler.Asset{
Key: key,
Type: handler.Segment,
ContentType: "video/MP2T",
Path: event.Name,
}
for h, _ := range g.handlers {
h.AssetRemoved(asset)
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("Got error:", err)
}
}
fmt.Println("watch: exited")
}