From 201c551aea41d01fc75dd913e2e4038e4e69ee19 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Fri, 10 Jul 2020 23:33:43 +0200 Subject: [PATCH] Continue refactor. Add generator and handler concepts. --- go.mod | 1 + go.sum | 4 + main.go | 161 +++++++++++------------------------ pkg/generator/ffmpeg.go | 76 +++++++++++++++++ pkg/generator/ffmpeg_test.go | 13 +++ pkg/generator/generator.go | 16 ++++ pkg/handler/handler.go | 26 ++++++ pkg/handler/s3.go | 72 ++++++++++++++++ pkg/handler/s3_test.go | 12 +++ 9 files changed, 272 insertions(+), 109 deletions(-) create mode 100644 pkg/generator/ffmpeg.go create mode 100644 pkg/generator/ffmpeg_test.go create mode 100644 pkg/generator/generator.go create mode 100644 pkg/handler/handler.go create mode 100644 pkg/handler/s3.go create mode 100644 pkg/handler/s3_test.go diff --git a/go.mod b/go.mod index bc21a71..6261a33 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.14 require ( github.com/aws/aws-sdk-go v1.33.5 + github.com/stretchr/testify v1.5.1 golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect gopkg.in/fsnotify.v1 v1.4.7 ) diff --git a/go.sum b/go.sum index c31df7a..5b98130 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,15 @@ github.com/aws/aws-sdk-go v1.33.5 h1:p2fr1ryvNTU6avUWLI+/H7FGv0TBIjzVM5WDgXBBv4U= github.com/aws/aws-sdk-go v1.33.5/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -17,4 +20,5 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go index 14c0ee2..4a575b4 100644 --- a/main.go +++ b/main.go @@ -3,41 +3,15 @@ package main import ( "flag" "fmt" - "io/ioutil" "log" "net/http" "os" - "os/exec" - "path/filepath" - "strings" - "sync" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3/s3manager" - "gopkg.in/fsnotify.v1" + "segmenta/pkg/generator" + "segmenta/pkg/handler" ) // How to stream a static video file as a "live" stream? -var s3Session *session.Session -var s3Uploader *s3manager.Uploader - -func init() { - var err error - s3Session, err = session.NewSession(&aws.Config{Region: aws.String("eu-west-1")}) - if err != nil { - panic(err) - } - - s3Uploader = s3manager.NewUploader(s3Session) -} - -type upload struct { - fname string - key string -} - func main() { var url string @@ -50,14 +24,6 @@ func main() { os.Exit(-1) } - // first, create a temporary directory. - tmpdir, err := ioutil.TempDir("", "playlist") - if err != nil { - log.Fatal(err) - } - defer os.RemoveAll(tmpdir) - target := filepath.Join(tmpdir, "out.m3u8") - // open URL: client := http.Client{} resp, err := client.Get(url) @@ -66,85 +32,62 @@ func main() { } defer resp.Body.Close() - var wg sync.WaitGroup + handler := handler.NewS3Handler("rfwatson-hls", "public-read") + generator := generator.NewFFMPEGGenerator("test", resp.Body) + generator.AddHandler(handler) - go func() { - // in this goroutine we open an HTTP URL and pipe it into the process: - defer wg.Done() + fmt.Println("starting generator...") - fmt.Println("Set target:", target) + err = generator.Generate() - cmd := exec.Command("ffmpeg", "-re", "-i", "-", "-f", "hls", target) - cmd.Stdin = resp.Body - - err = cmd.Run() - if err != nil { - log.Fatal(err) - } - - fmt.Println("ffmpeg exited") - }() - - go func() { - defer wg.Done() - - watcher, err := fsnotify.NewWatcher() - if err != nil { - log.Fatal(err) - } - defer watcher.Close() - - err = watcher.Add(tmpdir) - if err != nil { - panic(err) - } - - for { - select { - case event, ok := <-watcher.Events: - if !ok { - return - } - log.Println("Got event.Name:", event.Name) - key := filepath.Join("test-stream", filepath.Base(event.Name)) - if event.Op&fsnotify.Create == fsnotify.Create && strings.HasSuffix(event.Name, ".m3u8") { - uploadFile(upload{event.Name, key}) - } - if event.Op&fsnotify.Write == fsnotify.Write && strings.HasSuffix(event.Name, ".ts") { - uploadFile(upload{event.Name, key}) - } - case err, ok := <-watcher.Errors: - if !ok { - return - } - log.Println("Got error:", err) - } - } - - fmt.Println("sleep exited") - }() - - wg.Add(2) - wg.Wait() - - fmt.Println("exiting") -} - -func uploadFile(u upload) error { - f, err := os.Open(u.fname) if err != nil { - panic(err) + log.Fatal(err) } - defer f.Close() - result, err := s3Uploader.Upload(&s3manager.UploadInput{ - Key: aws.String(u.key), - Bucket: aws.String("rfwatson-hls"), - ACL: aws.String("public-read"), - Body: f, - }) + fmt.Println("completed generator...") - fmt.Println("Uploaded", u.fname, "to", result.Location) + //go func() { + //defer wg.Done() - return err + //watcher, err := fsnotify.NewWatcher() + //if err != nil { + //log.Fatal(err) + //} + //defer watcher.Close() + + //err = watcher.Add(tmpdir) + //if err != nil { + //panic(err) + //} + + //for { + //select { + //case event, ok := <-watcher.Events: + //if !ok { + //return + //} + //log.Println("Got event.Name:", event.Name) + //key := filepath.Join("test-stream", filepath.Base(event.Name)) + //if event.Op&fsnotify.Create == fsnotify.Create && strings.HasSuffix(event.Name, ".m3u8") { + //uploadFile(upload{event.Name, key}) + //} + //if event.Op&fsnotify.Write == fsnotify.Write && strings.HasSuffix(event.Name, ".ts") { + //uploadFile(upload{event.Name, key}) + //} + //case err, ok := <-watcher.Errors: + //if !ok { + //return + //} + //log.Println("Got error:", err) + //} + //} + + //fmt.Println("sleep exited") + //}() + + //wg.Add(2) + //wg.Wait() + + //fmt.Println("exiting") + //} } diff --git a/pkg/generator/ffmpeg.go b/pkg/generator/ffmpeg.go new file mode 100644 index 0000000..da7c054 --- /dev/null +++ b/pkg/generator/ffmpeg.go @@ -0,0 +1,76 @@ +package generator + +import ( + "fmt" + "io" + "io/ioutil" + "log" + "os" + "os/exec" + "path/filepath" + "segmenta/pkg/handler" + "sync" +) + +const AppName = "segmenta" +const DefaultPlaylistFilename = "playlist.m3u8" + +// Generator generates assets for a given stream. The default implementation +// segments using FFMPEG. +type FFMPEGGenerator struct { + name string + src io.Reader + handlers map[handler.Handler]bool +} + +func NewFFMPEGGenerator(name string, src io.Reader) *FFMPEGGenerator { + return &FFMPEGGenerator{ + name: name, + src: src, + handlers: make(map[handler.Handler]bool), + } +} + +func (g *FFMPEGGenerator) Generate() error { + // first, create a temporary directory. + prefix := fmt.Sprintf("%s-%s", AppName, g.name) + tmpdir, err := ioutil.TempDir("", prefix) + if err != nil { + return fmt.Errorf("ffmpeg: error initializing TempDir: %v", tmpdir) + } + defer os.RemoveAll(tmpdir) + + playlist := filepath.Join(tmpdir, DefaultPlaylistFilename) + + var wg sync.WaitGroup + + go g.doFFMPEG(&wg, playlist) + + wg.Add(1) + wg.Wait() + + return nil +} + +func (g *FFMPEGGenerator) AddHandler(h handler.Handler) { + g.handlers[h] = true +} + +func (g *FFMPEGGenerator) RemoveHandler(h handler.Handler) { + delete(g.handlers, h) +} + +// TODO should wg be passed in a Context? +func (g *FFMPEGGenerator) doFFMPEG(wg *sync.WaitGroup, dest string) { + defer wg.Done() + + cmd := exec.Command("ffmpeg", "-re", "-i", "-", "-f", "hls", "-t", "10", dest) + cmd.Stdin = g.src + + if err := cmd.Run(); err != nil { + // TODO: error handling + log.Fatalf("ffmpeg: error running cmd: %v", err) + } + + fmt.Println("ffmpeg: exited") +} diff --git a/pkg/generator/ffmpeg_test.go b/pkg/generator/ffmpeg_test.go new file mode 100644 index 0000000..7168ccb --- /dev/null +++ b/pkg/generator/ffmpeg_test.go @@ -0,0 +1,13 @@ +package generator_test + +import ( + "segmenta/pkg/generator" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFFMPEGGeneratorImplementsGenerator(t *testing.T) { + require.Implements(t, (*generator.Generator)(nil), new(generator.FFMPEGGenerator)) + require.Implements(t, (*generator.Handled)(nil), new(generator.FFMPEGGenerator)) +} diff --git a/pkg/generator/generator.go b/pkg/generator/generator.go new file mode 100644 index 0000000..643ebe2 --- /dev/null +++ b/pkg/generator/generator.go @@ -0,0 +1,16 @@ +package generator + +import ( + "segmenta/pkg/handler" +) + +// TODO: define or not? +type Generator interface { + Generate() error +} + +// TODO: define or not? +type Handled interface { + AddHandler(h handler.Handler) + RemoveHandler(h handler.Handler) +} diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go new file mode 100644 index 0000000..749e967 --- /dev/null +++ b/pkg/handler/handler.go @@ -0,0 +1,26 @@ +package handler + +import ( + "io" +) + +type AssetType int + +const ( + Segment AssetType = 0 + MediaPlaylist + MasterPlaylist +) + +type Asset struct { + Name string + Key string + Type AssetType + ContentType string + Body io.Reader +} + +type Handler interface { + AssetAdded(a Asset) error + AssetRemoved(a Asset) error +} diff --git a/pkg/handler/s3.go b/pkg/handler/s3.go new file mode 100644 index 0000000..7ced8d0 --- /dev/null +++ b/pkg/handler/s3.go @@ -0,0 +1,72 @@ +package handler + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" +) + +const DefaultAwsRegion = "eu-west-1" + +var ( + s3Session *session.Session + s3Client *s3.S3 + s3Uploader *s3manager.Uploader +) + +func init() { + var err error + s3Session, err = session.NewSession(&aws.Config{Region: aws.String(DefaultAwsRegion)}) + if err != nil { + panic(err) + } + + s3Client = s3.New(s3Session) + s3Uploader = s3manager.NewUploader(s3Session) +} + +type S3Handler struct { + Bucket string + ACL string +} + +func NewS3Handler(bucket string, acl string) *S3Handler { + return &S3Handler{ + Bucket: bucket, + ACL: acl, + } +} + +func (h *S3Handler) AssetAdded(a Asset) error { + result, err := s3Uploader.Upload(&s3manager.UploadInput{ + Key: aws.String(a.Key), + Bucket: aws.String(h.Bucket), + ACL: aws.String(h.ACL), + Body: a.Body, + }) + if err != nil { + return err + } + + fmt.Println("Uploaded", a.Name, "to", result.Location) + + return nil +} + +func (h *S3Handler) AssetRemoved(a Asset) error { + // TODO consider cleaning up assets every ~60 seconds? + // https://github.com/aws/aws-sdk-go/blob/v1.33.4/service/s3/s3manager/batch.go#L255 + _, err := s3Client.DeleteObject(&s3.DeleteObjectInput{ + Key: aws.String(a.Key), + Bucket: aws.String(h.Bucket), + }) + + if err != nil { + return err + } + + return nil +} diff --git a/pkg/handler/s3_test.go b/pkg/handler/s3_test.go new file mode 100644 index 0000000..0322ca6 --- /dev/null +++ b/pkg/handler/s3_test.go @@ -0,0 +1,12 @@ +package handler_test + +import ( + "segmenta/pkg/handler" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestS3HandlerImplementsHandler(t *testing.T) { + require.Implements(t, (*handler.Handler)(nil), new(handler.S3Handler)) +}