153 lines
3.6 KiB
Go
153 lines
3.6 KiB
Go
package generator
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"log"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"segmenta/pkg/handler"
|
|
"strings"
|
|
"sync"
|
|
|
|
"gopkg.in/fsnotify.v1"
|
|
)
|
|
|
|
const AppName = "segmenta"
|
|
const DefaultPlaylistFilename = "playlist.m3u8"
|
|
|
|
// Generator generates assets for a given stream. The default implementation
|
|
// segments using FFMPEG.
|
|
type FFMPEGGenerator struct {
|
|
name string
|
|
src io.Reader
|
|
handlers map[handler.Handler]bool
|
|
}
|
|
|
|
func NewFFMPEGGenerator(name string, src io.Reader) *FFMPEGGenerator {
|
|
return &FFMPEGGenerator{
|
|
name: name,
|
|
src: src,
|
|
handlers: make(map[handler.Handler]bool),
|
|
}
|
|
}
|
|
|
|
func (g *FFMPEGGenerator) Generate() error {
|
|
// first, create a temporary directory.
|
|
prefix := fmt.Sprintf("%s-%s", AppName, g.name)
|
|
tmpdir, err := ioutil.TempDir("", prefix)
|
|
if err != nil {
|
|
return fmt.Errorf("ffmpeg: error initializing TempDir: %v", tmpdir)
|
|
}
|
|
defer os.RemoveAll(tmpdir)
|
|
|
|
playlist := filepath.Join(tmpdir, DefaultPlaylistFilename)
|
|
|
|
// FIXME: Better concurrency management is needed here. We should expect doMMPEG to run
|
|
// indefinitely, and its eventual closure (successful or otherwise) to trigger closure
|
|
// of this main process, including other goroutines.
|
|
// one instance of doWatch should be running at all times until the FFMPEG goroutine is
|
|
// closed. If it is interrupted for some reason before doFFMPEG has terminated, then
|
|
// this is a bug. It should be restarted.
|
|
var wg sync.WaitGroup
|
|
|
|
go g.doFFMPEG(&wg, playlist)
|
|
go g.doWatch(&wg, tmpdir)
|
|
|
|
wg.Add(2)
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (g *FFMPEGGenerator) AddHandler(h handler.Handler) {
|
|
g.handlers[h] = true
|
|
}
|
|
|
|
func (g *FFMPEGGenerator) RemoveHandler(h handler.Handler) {
|
|
delete(g.handlers, h)
|
|
}
|
|
|
|
// TODO should wg be passed in a Context?
|
|
func (g *FFMPEGGenerator) doFFMPEG(wg *sync.WaitGroup, dest string) {
|
|
defer wg.Done()
|
|
|
|
cmd := exec.Command("ffmpeg", "-re", "-i", "-", "-f", "hls", "-hls_flags", "delete_segments", dest)
|
|
cmd.Stdin = g.src
|
|
|
|
if err := cmd.Run(); err != nil {
|
|
// TODO: error handling
|
|
log.Fatalf("ffmpeg: error running cmd: %v", err)
|
|
}
|
|
|
|
fmt.Println("ffmpeg: exited")
|
|
}
|
|
|
|
func (g *FFMPEGGenerator) doWatch(wg *sync.WaitGroup, srcDir string) {
|
|
defer wg.Done()
|
|
|
|
watcher, err := fsnotify.NewWatcher()
|
|
if err != nil {
|
|
// TODO: error handling
|
|
log.Fatal(err)
|
|
}
|
|
defer watcher.Close()
|
|
|
|
err = watcher.Add(srcDir)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case event, ok := <-watcher.Events:
|
|
if !ok {
|
|
return
|
|
}
|
|
key := filepath.Join(g.name, filepath.Base(event.Name))
|
|
if event.Op&fsnotify.Create == fsnotify.Create && strings.HasSuffix(event.Name, ".m3u8") {
|
|
asset := handler.Asset{
|
|
Key: key,
|
|
Type: handler.MediaPlaylist,
|
|
ContentType: "vnd.apple.mpegURL",
|
|
Path: event.Name,
|
|
}
|
|
for h, _ := range g.handlers {
|
|
h.AssetAdded(asset)
|
|
}
|
|
}
|
|
if event.Op&fsnotify.Write == fsnotify.Write && strings.HasSuffix(event.Name, ".ts") {
|
|
asset := handler.Asset{
|
|
Key: key,
|
|
Type: handler.Segment,
|
|
ContentType: "video/MP2T",
|
|
Path: event.Name,
|
|
}
|
|
for h, _ := range g.handlers {
|
|
h.AssetAdded(asset)
|
|
}
|
|
}
|
|
if event.Op&fsnotify.Remove == fsnotify.Remove && strings.HasSuffix(event.Name, ".ts") {
|
|
asset := handler.Asset{
|
|
Key: key,
|
|
Type: handler.Segment,
|
|
ContentType: "video/MP2T",
|
|
Path: event.Name,
|
|
}
|
|
for h, _ := range g.handlers {
|
|
h.AssetRemoved(asset)
|
|
}
|
|
}
|
|
case err, ok := <-watcher.Errors:
|
|
if !ok {
|
|
return
|
|
}
|
|
log.Println("Got error:", err)
|
|
}
|
|
}
|
|
|
|
fmt.Println("watch: exited")
|
|
}
|