package generator import ( "fmt" "io" "io/ioutil" "log" "os" "os/exec" "path/filepath" "segmenta/pkg/handler" "strings" "sync" "gopkg.in/fsnotify.v1" ) const AppName = "segmenta" const DefaultPlaylistFilename = "playlist.m3u8" // Generator generates assets for a given stream. The default implementation // segments using FFMPEG. type FFMPEGGenerator struct { name string src io.Reader handlers map[handler.Handler]bool } func NewFFMPEGGenerator(name string, src io.Reader) *FFMPEGGenerator { return &FFMPEGGenerator{ name: name, src: src, handlers: make(map[handler.Handler]bool), } } func (g *FFMPEGGenerator) Generate() error { // first, create a temporary directory. prefix := fmt.Sprintf("%s-%s", AppName, g.name) tmpdir, err := ioutil.TempDir("", prefix) if err != nil { return fmt.Errorf("ffmpeg: error initializing TempDir: %v", tmpdir) } defer os.RemoveAll(tmpdir) playlist := filepath.Join(tmpdir, DefaultPlaylistFilename) // FIXME: Better concurrency management is needed here. We should expect doMMPEG to run // indefinitely, and its eventual closure (successful or otherwise) to trigger closure // of this main process, including other goroutines. // one instance of doWatch should be running at all times until the FFMPEG goroutine is // closed. If it is interrupted for some reason before doFFMPEG has terminated, then // this is a bug. It should be restarted. var wg sync.WaitGroup go g.doFFMPEG(&wg, playlist) go g.doWatch(&wg, tmpdir) wg.Add(2) wg.Wait() return nil } func (g *FFMPEGGenerator) AddHandler(h handler.Handler) { g.handlers[h] = true } func (g *FFMPEGGenerator) RemoveHandler(h handler.Handler) { delete(g.handlers, h) } // TODO should wg be passed in a Context? func (g *FFMPEGGenerator) doFFMPEG(wg *sync.WaitGroup, dest string) { defer wg.Done() cmd := exec.Command("ffmpeg", "-re", "-i", "-", "-f", "hls", "-hls_flags", "delete_segments", dest) cmd.Stdin = g.src if err := cmd.Run(); err != nil { // TODO: error handling log.Fatalf("ffmpeg: error running cmd: %v", err) } fmt.Println("ffmpeg: exited") } func (g *FFMPEGGenerator) doWatch(wg *sync.WaitGroup, srcDir string) { defer wg.Done() watcher, err := fsnotify.NewWatcher() if err != nil { // TODO: error handling log.Fatal(err) } defer watcher.Close() err = watcher.Add(srcDir) if err != nil { log.Fatal(err) } for { select { case event, ok := <-watcher.Events: if !ok { return } key := filepath.Join(g.name, filepath.Base(event.Name)) if event.Op&fsnotify.Create == fsnotify.Create && strings.HasSuffix(event.Name, ".m3u8") { asset := handler.Asset{ Key: key, Type: handler.MediaPlaylist, ContentType: "vnd.apple.mpegURL", Path: event.Name, } for h, _ := range g.handlers { h.AssetAdded(asset) } } if event.Op&fsnotify.Write == fsnotify.Write && strings.HasSuffix(event.Name, ".ts") { asset := handler.Asset{ Key: key, Type: handler.Segment, ContentType: "video/MP2T", Path: event.Name, } for h, _ := range g.handlers { h.AssetAdded(asset) } } if event.Op&fsnotify.Remove == fsnotify.Remove && strings.HasSuffix(event.Name, ".ts") { asset := handler.Asset{ Key: key, Type: handler.Segment, ContentType: "video/MP2T", Path: event.Name, } for h, _ := range g.handlers { h.AssetRemoved(asset) } } case err, ok := <-watcher.Errors: if !ok { return } log.Println("Got error:", err) } } fmt.Println("watch: exited") }