diff --git a/pkg/generator/ffmpeg.go b/pkg/generator/ffmpeg.go index 3c6a1c1..878f75e 100644 --- a/pkg/generator/ffmpeg.go +++ b/pkg/generator/ffmpeg.go @@ -47,9 +47,10 @@ func (g *FFMPEGGenerator) Generate() error { // FIXME: Better concurrency management is needed here. We should expect doMMPEG to run // indefinitely, and its eventual closure (successful or otherwise) to trigger closure - // of this process, including other goroutines. + // of this main process, including other goroutines. // one instance of doWatch should be running at all times until the FFMPEG goroutine is - // closed. + // closed. If it is interrupted for some reason before doFFMPEG has terminated, then + // this is a bug. It should be restarted. var wg sync.WaitGroup go g.doFFMPEG(&wg, playlist) @@ -73,7 +74,7 @@ func (g *FFMPEGGenerator) RemoveHandler(h handler.Handler) { func (g *FFMPEGGenerator) doFFMPEG(wg *sync.WaitGroup, dest string) { defer wg.Done() - cmd := exec.Command("ffmpeg", "-re", "-i", "-", "-f", "hls", "-t", "10", dest) + cmd := exec.Command("ffmpeg", "-re", "-i", "-", "-f", "hls", "-hls_flags", "delete_segments", dest) cmd.Stdin = g.src if err := cmd.Run(); err != nil { @@ -107,10 +108,37 @@ func (g *FFMPEGGenerator) doWatch(wg *sync.WaitGroup, srcDir string) { } key := filepath.Join(g.name, filepath.Base(event.Name)) if event.Op&fsnotify.Create == fsnotify.Create && strings.HasSuffix(event.Name, ".m3u8") { - fmt.Println("New playlist:", event.Name, "upload to:", key) + asset := handler.Asset{ + Key: key, + Type: handler.MediaPlaylist, + ContentType: "vnd.apple.mpegURL", + Path: event.Name, + } + for h, _ := range g.handlers { + h.AssetAdded(asset) + } } if event.Op&fsnotify.Write == fsnotify.Write && strings.HasSuffix(event.Name, ".ts") { - fmt.Println("New segment:", event.Name, "upload to:", key) + asset := handler.Asset{ + Key: key, + Type: handler.Segment, + ContentType: "video/MP2T", + Path: event.Name, + } + for h, _ := range g.handlers { + h.AssetAdded(asset) + } + } + if event.Op&fsnotify.Remove == fsnotify.Remove && strings.HasSuffix(event.Name, ".ts") { + asset := handler.Asset{ + Key: key, + Type: handler.Segment, + ContentType: "video/MP2T", + Path: event.Name, + } + for h, _ := range g.handlers { + h.AssetRemoved(asset) + } } case err, ok := <-watcher.Errors: if !ok { diff --git a/pkg/generator/generator.go b/pkg/generator/generator.go index 643ebe2..b724842 100644 --- a/pkg/generator/generator.go +++ b/pkg/generator/generator.go @@ -4,6 +4,8 @@ import ( "segmenta/pkg/handler" ) +// Generator implements behaviour for generating segments from some incoming media stream. +// TODO: possibly rename to Segmenter? // TODO: define or not? type Generator interface { Generate() error diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index 749e967..91ef25a 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -1,9 +1,5 @@ package handler -import ( - "io" -) - type AssetType int const ( @@ -13,14 +9,18 @@ const ( ) type Asset struct { - Name string - Key string + Path string + Key string // should this be passed with the asset? or decided upon by the handler? Type AssetType ContentType string - Body io.Reader } type Handler interface { AssetAdded(a Asset) error + + // TODO it is probably better to remove this because it will be difficult to ensure + // that all old assets are always cleaned up (it would be easy for this callback to be missed, + // for example). Better for implementations to be responsible for cleaning up old files, either + // using an implementation-specific method (e.g. S3 expiring objects) or just a periodic check. AssetRemoved(a Asset) error } diff --git a/pkg/handler/s3.go b/pkg/handler/s3.go index 7ced8d0..667bcd5 100644 --- a/pkg/handler/s3.go +++ b/pkg/handler/s3.go @@ -2,6 +2,7 @@ package handler import ( "fmt" + "os" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -41,17 +42,23 @@ func NewS3Handler(bucket string, acl string) *S3Handler { } func (h *S3Handler) AssetAdded(a Asset) error { + f, err := os.Open(a.Path) + if err != nil { + return err + } + defer f.Close() + result, err := s3Uploader.Upload(&s3manager.UploadInput{ Key: aws.String(a.Key), Bucket: aws.String(h.Bucket), ACL: aws.String(h.ACL), - Body: a.Body, + Body: f, }) if err != nil { return err } - fmt.Println("Uploaded", a.Name, "to", result.Location) + fmt.Println("Uploaded", a.Path, "to", result.Location) return nil } @@ -64,6 +71,8 @@ func (h *S3Handler) AssetRemoved(a Asset) error { Bucket: aws.String(h.Bucket), }) + fmt.Println("Deleted", a.Key) + if err != nil { return err }