Further refactoring.

This commit is contained in:
Rob Watson 2020-07-13 14:24:28 +02:00
parent 15fe812d1d
commit edf807851b
4 changed files with 53 additions and 14 deletions

View File

@ -47,9 +47,10 @@ func (g *FFMPEGGenerator) Generate() error {
// FIXME: Better concurrency management is needed here. We should expect doMMPEG to run // FIXME: Better concurrency management is needed here. We should expect doMMPEG to run
// indefinitely, and its eventual closure (successful or otherwise) to trigger closure // indefinitely, and its eventual closure (successful or otherwise) to trigger closure
// of this process, including other goroutines. // of this main process, including other goroutines.
// one instance of doWatch should be running at all times until the FFMPEG goroutine is // one instance of doWatch should be running at all times until the FFMPEG goroutine is
// closed. // closed. If it is interrupted for some reason before doFFMPEG has terminated, then
// this is a bug. It should be restarted.
var wg sync.WaitGroup var wg sync.WaitGroup
go g.doFFMPEG(&wg, playlist) go g.doFFMPEG(&wg, playlist)
@ -73,7 +74,7 @@ func (g *FFMPEGGenerator) RemoveHandler(h handler.Handler) {
func (g *FFMPEGGenerator) doFFMPEG(wg *sync.WaitGroup, dest string) { func (g *FFMPEGGenerator) doFFMPEG(wg *sync.WaitGroup, dest string) {
defer wg.Done() defer wg.Done()
cmd := exec.Command("ffmpeg", "-re", "-i", "-", "-f", "hls", "-t", "10", dest) cmd := exec.Command("ffmpeg", "-re", "-i", "-", "-f", "hls", "-hls_flags", "delete_segments", dest)
cmd.Stdin = g.src cmd.Stdin = g.src
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
@ -107,10 +108,37 @@ func (g *FFMPEGGenerator) doWatch(wg *sync.WaitGroup, srcDir string) {
} }
key := filepath.Join(g.name, filepath.Base(event.Name)) key := filepath.Join(g.name, filepath.Base(event.Name))
if event.Op&fsnotify.Create == fsnotify.Create && strings.HasSuffix(event.Name, ".m3u8") { if event.Op&fsnotify.Create == fsnotify.Create && strings.HasSuffix(event.Name, ".m3u8") {
fmt.Println("New playlist:", event.Name, "upload to:", key) asset := handler.Asset{
Key: key,
Type: handler.MediaPlaylist,
ContentType: "vnd.apple.mpegURL",
Path: event.Name,
}
for h, _ := range g.handlers {
h.AssetAdded(asset)
}
} }
if event.Op&fsnotify.Write == fsnotify.Write && strings.HasSuffix(event.Name, ".ts") { if event.Op&fsnotify.Write == fsnotify.Write && strings.HasSuffix(event.Name, ".ts") {
fmt.Println("New segment:", event.Name, "upload to:", key) asset := handler.Asset{
Key: key,
Type: handler.Segment,
ContentType: "video/MP2T",
Path: event.Name,
}
for h, _ := range g.handlers {
h.AssetAdded(asset)
}
}
if event.Op&fsnotify.Remove == fsnotify.Remove && strings.HasSuffix(event.Name, ".ts") {
asset := handler.Asset{
Key: key,
Type: handler.Segment,
ContentType: "video/MP2T",
Path: event.Name,
}
for h, _ := range g.handlers {
h.AssetRemoved(asset)
}
} }
case err, ok := <-watcher.Errors: case err, ok := <-watcher.Errors:
if !ok { if !ok {

View File

@ -4,6 +4,8 @@ import (
"segmenta/pkg/handler" "segmenta/pkg/handler"
) )
// Generator implements behaviour for generating segments from some incoming media stream.
// TODO: possibly rename to Segmenter?
// TODO: define or not? // TODO: define or not?
type Generator interface { type Generator interface {
Generate() error Generate() error

View File

@ -1,9 +1,5 @@
package handler package handler
import (
"io"
)
type AssetType int type AssetType int
const ( const (
@ -13,14 +9,18 @@ const (
) )
type Asset struct { type Asset struct {
Name string Path string
Key string Key string // should this be passed with the asset? or decided upon by the handler?
Type AssetType Type AssetType
ContentType string ContentType string
Body io.Reader
} }
type Handler interface { type Handler interface {
AssetAdded(a Asset) error AssetAdded(a Asset) error
// TODO it is probably better to remove this because it will be difficult to ensure
// that all old assets are always cleaned up (it would be easy for this callback to be missed,
// for example). Better for implementations to be responsible for cleaning up old files, either
// using an implementation-specific method (e.g. S3 expiring objects) or just a periodic check.
AssetRemoved(a Asset) error AssetRemoved(a Asset) error
} }

View File

@ -2,6 +2,7 @@ package handler
import ( import (
"fmt" "fmt"
"os"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
@ -41,17 +42,23 @@ func NewS3Handler(bucket string, acl string) *S3Handler {
} }
func (h *S3Handler) AssetAdded(a Asset) error { func (h *S3Handler) AssetAdded(a Asset) error {
f, err := os.Open(a.Path)
if err != nil {
return err
}
defer f.Close()
result, err := s3Uploader.Upload(&s3manager.UploadInput{ result, err := s3Uploader.Upload(&s3manager.UploadInput{
Key: aws.String(a.Key), Key: aws.String(a.Key),
Bucket: aws.String(h.Bucket), Bucket: aws.String(h.Bucket),
ACL: aws.String(h.ACL), ACL: aws.String(h.ACL),
Body: a.Body, Body: f,
}) })
if err != nil { if err != nil {
return err return err
} }
fmt.Println("Uploaded", a.Name, "to", result.Location) fmt.Println("Uploaded", a.Path, "to", result.Location)
return nil return nil
} }
@ -64,6 +71,8 @@ func (h *S3Handler) AssetRemoved(a Asset) error {
Bucket: aws.String(h.Bucket), Bucket: aws.String(h.Bucket),
}) })
fmt.Println("Deleted", a.Key)
if err != nil { if err != nil {
return err return err
} }