From 15fe812d1dacd1d083be9b20386cf5411b3a0bcb Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Sat, 11 Jul 2020 00:40:33 +0200 Subject: [PATCH] Continue implementation of ffmpeg generator --- main.go | 45 ------------------------------------- pkg/generator/ffmpeg.go | 50 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 49 insertions(+), 46 deletions(-) diff --git a/main.go b/main.go index 4a575b4..4edc854 100644 --- a/main.go +++ b/main.go @@ -45,49 +45,4 @@ func main() { } fmt.Println("completed generator...") - - //go func() { - //defer wg.Done() - - //watcher, err := fsnotify.NewWatcher() - //if err != nil { - //log.Fatal(err) - //} - //defer watcher.Close() - - //err = watcher.Add(tmpdir) - //if err != nil { - //panic(err) - //} - - //for { - //select { - //case event, ok := <-watcher.Events: - //if !ok { - //return - //} - //log.Println("Got event.Name:", event.Name) - //key := filepath.Join("test-stream", filepath.Base(event.Name)) - //if event.Op&fsnotify.Create == fsnotify.Create && strings.HasSuffix(event.Name, ".m3u8") { - //uploadFile(upload{event.Name, key}) - //} - //if event.Op&fsnotify.Write == fsnotify.Write && strings.HasSuffix(event.Name, ".ts") { - //uploadFile(upload{event.Name, key}) - //} - //case err, ok := <-watcher.Errors: - //if !ok { - //return - //} - //log.Println("Got error:", err) - //} - //} - - //fmt.Println("sleep exited") - //}() - - //wg.Add(2) - //wg.Wait() - - //fmt.Println("exiting") - //} } diff --git a/pkg/generator/ffmpeg.go b/pkg/generator/ffmpeg.go index da7c054..3c6a1c1 100644 --- a/pkg/generator/ffmpeg.go +++ b/pkg/generator/ffmpeg.go @@ -9,7 +9,10 @@ import ( "os/exec" "path/filepath" "segmenta/pkg/handler" + "strings" "sync" + + "gopkg.in/fsnotify.v1" ) const AppName = "segmenta" @@ -42,11 +45,17 @@ func (g *FFMPEGGenerator) Generate() error { playlist := filepath.Join(tmpdir, DefaultPlaylistFilename) + // FIXME: Better concurrency management is needed here. We should expect doMMPEG to run + // indefinitely, and its eventual closure (successful or otherwise) to trigger closure + // of this process, including other goroutines. + // one instance of doWatch should be running at all times until the FFMPEG goroutine is + // closed. var wg sync.WaitGroup go g.doFFMPEG(&wg, playlist) + go g.doWatch(&wg, tmpdir) - wg.Add(1) + wg.Add(2) wg.Wait() return nil @@ -74,3 +83,42 @@ func (g *FFMPEGGenerator) doFFMPEG(wg *sync.WaitGroup, dest string) { fmt.Println("ffmpeg: exited") } + +func (g *FFMPEGGenerator) doWatch(wg *sync.WaitGroup, srcDir string) { + defer wg.Done() + + watcher, err := fsnotify.NewWatcher() + if err != nil { + // TODO: error handling + log.Fatal(err) + } + defer watcher.Close() + + err = watcher.Add(srcDir) + if err != nil { + log.Fatal(err) + } + + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + key := filepath.Join(g.name, filepath.Base(event.Name)) + if event.Op&fsnotify.Create == fsnotify.Create && strings.HasSuffix(event.Name, ".m3u8") { + fmt.Println("New playlist:", event.Name, "upload to:", key) + } + if event.Op&fsnotify.Write == fsnotify.Write && strings.HasSuffix(event.Name, ".ts") { + fmt.Println("New segment:", event.Name, "upload to:", key) + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + log.Println("Got error:", err) + } + } + + fmt.Println("watch: exited") +}