diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..bc21a71 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module segmenta + +go 1.14 + +require ( + github.com/aws/aws-sdk-go v1.33.5 + golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect + gopkg.in/fsnotify.v1 v1.4.7 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c31df7a --- /dev/null +++ b/go.sum @@ -0,0 +1,20 @@ +github.com/aws/aws-sdk-go v1.33.5 h1:p2fr1ryvNTU6avUWLI+/H7FGv0TBIjzVM5WDgXBBv4U= +github.com/aws/aws-sdk-go v1.33.5/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= +github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae h1:Ih9Yo4hSPImZOpfGuA4bR/ORKTAbhZo2AbWNRCnevdo= +golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go new file mode 100644 index 0000000..14c0ee2 --- /dev/null +++ b/main.go @@ -0,0 +1,150 @@ +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "log" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "gopkg.in/fsnotify.v1" +) + +// How to stream a static video file as a "live" stream? + +var s3Session *session.Session +var s3Uploader *s3manager.Uploader + +func init() { + var err error + s3Session, err = session.NewSession(&aws.Config{Region: aws.String("eu-west-1")}) + if err != nil { + panic(err) + } + + s3Uploader = s3manager.NewUploader(s3Session) +} + +type upload struct { + fname string + key string +} + +func main() { + var url string + + flag.StringVar(&url, "url", "", "URL to open") + flag.Parse() + + if url == "" { + fmt.Println("Usage:") + flag.PrintDefaults() + os.Exit(-1) + } + + // first, create a temporary directory. + tmpdir, err := ioutil.TempDir("", "playlist") + if err != nil { + log.Fatal(err) + } + defer os.RemoveAll(tmpdir) + target := filepath.Join(tmpdir, "out.m3u8") + + // open URL: + client := http.Client{} + resp, err := client.Get(url) + if err != nil { + log.Fatal(err) + } + defer resp.Body.Close() + + var wg sync.WaitGroup + + go func() { + // in this goroutine we open an HTTP URL and pipe it into the process: + defer wg.Done() + + fmt.Println("Set target:", target) + + cmd := exec.Command("ffmpeg", "-re", "-i", "-", "-f", "hls", target) + cmd.Stdin = resp.Body + + err = cmd.Run() + if err != nil { + log.Fatal(err) + } + + fmt.Println("ffmpeg exited") + }() + + go func() { + defer wg.Done() + + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Fatal(err) + } + defer watcher.Close() + + err = watcher.Add(tmpdir) + if err != nil { + panic(err) + } + + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + log.Println("Got event.Name:", event.Name) + key := filepath.Join("test-stream", filepath.Base(event.Name)) + if event.Op&fsnotify.Create == fsnotify.Create && strings.HasSuffix(event.Name, ".m3u8") { + uploadFile(upload{event.Name, key}) + } + if event.Op&fsnotify.Write == fsnotify.Write && strings.HasSuffix(event.Name, ".ts") { + uploadFile(upload{event.Name, key}) + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + log.Println("Got error:", err) + } + } + + fmt.Println("sleep exited") + }() + + wg.Add(2) + wg.Wait() + + fmt.Println("exiting") +} + +func uploadFile(u upload) error { + f, err := os.Open(u.fname) + if err != nil { + panic(err) + } + defer f.Close() + + result, err := s3Uploader.Upload(&s3manager.UploadInput{ + Key: aws.String(u.key), + Bucket: aws.String("rfwatson-hls"), + ACL: aws.String("public-read"), + Body: f, + }) + + fmt.Println("Uploaded", u.fname, "to", result.Location) + + return err +}