Continue refactor. Add generator and handler concepts.
This commit is contained in:
parent
67581421dc
commit
201c551aea
1
go.mod
1
go.mod
|
@ -4,6 +4,7 @@ go 1.14
|
|||
|
||||
require (
|
||||
github.com/aws/aws-sdk-go v1.33.5
|
||||
github.com/stretchr/testify v1.5.1
|
||||
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
|
||||
gopkg.in/fsnotify.v1 v1.4.7
|
||||
)
|
||||
|
|
4
go.sum
4
go.sum
|
@ -1,12 +1,15 @@
|
|||
github.com/aws/aws-sdk-go v1.33.5 h1:p2fr1ryvNTU6avUWLI+/H7FGv0TBIjzVM5WDgXBBv4U=
|
||||
github.com/aws/aws-sdk-go v1.33.5/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
|
||||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
|
||||
github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc=
|
||||
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
|
||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
|
@ -17,4 +20,5 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
|||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
|
161
main.go
161
main.go
|
@ -3,41 +3,15 @@ package main
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"gopkg.in/fsnotify.v1"
|
||||
"segmenta/pkg/generator"
|
||||
"segmenta/pkg/handler"
|
||||
)
|
||||
|
||||
// How to stream a static video file as a "live" stream?
|
||||
|
||||
var s3Session *session.Session
|
||||
var s3Uploader *s3manager.Uploader
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
s3Session, err = session.NewSession(&aws.Config{Region: aws.String("eu-west-1")})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
s3Uploader = s3manager.NewUploader(s3Session)
|
||||
}
|
||||
|
||||
type upload struct {
|
||||
fname string
|
||||
key string
|
||||
}
|
||||
|
||||
func main() {
|
||||
var url string
|
||||
|
||||
|
@ -50,14 +24,6 @@ func main() {
|
|||
os.Exit(-1)
|
||||
}
|
||||
|
||||
// first, create a temporary directory.
|
||||
tmpdir, err := ioutil.TempDir("", "playlist")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(tmpdir)
|
||||
target := filepath.Join(tmpdir, "out.m3u8")
|
||||
|
||||
// open URL:
|
||||
client := http.Client{}
|
||||
resp, err := client.Get(url)
|
||||
|
@ -66,85 +32,62 @@ func main() {
|
|||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
handler := handler.NewS3Handler("rfwatson-hls", "public-read")
|
||||
generator := generator.NewFFMPEGGenerator("test", resp.Body)
|
||||
generator.AddHandler(handler)
|
||||
|
||||
go func() {
|
||||
// in this goroutine we open an HTTP URL and pipe it into the process:
|
||||
defer wg.Done()
|
||||
fmt.Println("starting generator...")
|
||||
|
||||
fmt.Println("Set target:", target)
|
||||
err = generator.Generate()
|
||||
|
||||
cmd := exec.Command("ffmpeg", "-re", "-i", "-", "-f", "hls", target)
|
||||
cmd.Stdin = resp.Body
|
||||
|
||||
err = cmd.Run()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
fmt.Println("ffmpeg exited")
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer watcher.Close()
|
||||
|
||||
err = watcher.Add(tmpdir)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-watcher.Events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
log.Println("Got event.Name:", event.Name)
|
||||
key := filepath.Join("test-stream", filepath.Base(event.Name))
|
||||
if event.Op&fsnotify.Create == fsnotify.Create && strings.HasSuffix(event.Name, ".m3u8") {
|
||||
uploadFile(upload{event.Name, key})
|
||||
}
|
||||
if event.Op&fsnotify.Write == fsnotify.Write && strings.HasSuffix(event.Name, ".ts") {
|
||||
uploadFile(upload{event.Name, key})
|
||||
}
|
||||
case err, ok := <-watcher.Errors:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
log.Println("Got error:", err)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Println("sleep exited")
|
||||
}()
|
||||
|
||||
wg.Add(2)
|
||||
wg.Wait()
|
||||
|
||||
fmt.Println("exiting")
|
||||
}
|
||||
|
||||
func uploadFile(u upload) error {
|
||||
f, err := os.Open(u.fname)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
result, err := s3Uploader.Upload(&s3manager.UploadInput{
|
||||
Key: aws.String(u.key),
|
||||
Bucket: aws.String("rfwatson-hls"),
|
||||
ACL: aws.String("public-read"),
|
||||
Body: f,
|
||||
})
|
||||
fmt.Println("completed generator...")
|
||||
|
||||
fmt.Println("Uploaded", u.fname, "to", result.Location)
|
||||
//go func() {
|
||||
//defer wg.Done()
|
||||
|
||||
return err
|
||||
//watcher, err := fsnotify.NewWatcher()
|
||||
//if err != nil {
|
||||
//log.Fatal(err)
|
||||
//}
|
||||
//defer watcher.Close()
|
||||
|
||||
//err = watcher.Add(tmpdir)
|
||||
//if err != nil {
|
||||
//panic(err)
|
||||
//}
|
||||
|
||||
//for {
|
||||
//select {
|
||||
//case event, ok := <-watcher.Events:
|
||||
//if !ok {
|
||||
//return
|
||||
//}
|
||||
//log.Println("Got event.Name:", event.Name)
|
||||
//key := filepath.Join("test-stream", filepath.Base(event.Name))
|
||||
//if event.Op&fsnotify.Create == fsnotify.Create && strings.HasSuffix(event.Name, ".m3u8") {
|
||||
//uploadFile(upload{event.Name, key})
|
||||
//}
|
||||
//if event.Op&fsnotify.Write == fsnotify.Write && strings.HasSuffix(event.Name, ".ts") {
|
||||
//uploadFile(upload{event.Name, key})
|
||||
//}
|
||||
//case err, ok := <-watcher.Errors:
|
||||
//if !ok {
|
||||
//return
|
||||
//}
|
||||
//log.Println("Got error:", err)
|
||||
//}
|
||||
//}
|
||||
|
||||
//fmt.Println("sleep exited")
|
||||
//}()
|
||||
|
||||
//wg.Add(2)
|
||||
//wg.Wait()
|
||||
|
||||
//fmt.Println("exiting")
|
||||
//}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
package generator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"segmenta/pkg/handler"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const AppName = "segmenta"
|
||||
const DefaultPlaylistFilename = "playlist.m3u8"
|
||||
|
||||
// Generator generates assets for a given stream. The default implementation
|
||||
// segments using FFMPEG.
|
||||
type FFMPEGGenerator struct {
|
||||
name string
|
||||
src io.Reader
|
||||
handlers map[handler.Handler]bool
|
||||
}
|
||||
|
||||
func NewFFMPEGGenerator(name string, src io.Reader) *FFMPEGGenerator {
|
||||
return &FFMPEGGenerator{
|
||||
name: name,
|
||||
src: src,
|
||||
handlers: make(map[handler.Handler]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (g *FFMPEGGenerator) Generate() error {
|
||||
// first, create a temporary directory.
|
||||
prefix := fmt.Sprintf("%s-%s", AppName, g.name)
|
||||
tmpdir, err := ioutil.TempDir("", prefix)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ffmpeg: error initializing TempDir: %v", tmpdir)
|
||||
}
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
playlist := filepath.Join(tmpdir, DefaultPlaylistFilename)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
go g.doFFMPEG(&wg, playlist)
|
||||
|
||||
wg.Add(1)
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *FFMPEGGenerator) AddHandler(h handler.Handler) {
|
||||
g.handlers[h] = true
|
||||
}
|
||||
|
||||
func (g *FFMPEGGenerator) RemoveHandler(h handler.Handler) {
|
||||
delete(g.handlers, h)
|
||||
}
|
||||
|
||||
// TODO should wg be passed in a Context?
|
||||
func (g *FFMPEGGenerator) doFFMPEG(wg *sync.WaitGroup, dest string) {
|
||||
defer wg.Done()
|
||||
|
||||
cmd := exec.Command("ffmpeg", "-re", "-i", "-", "-f", "hls", "-t", "10", dest)
|
||||
cmd.Stdin = g.src
|
||||
|
||||
if err := cmd.Run(); err != nil {
|
||||
// TODO: error handling
|
||||
log.Fatalf("ffmpeg: error running cmd: %v", err)
|
||||
}
|
||||
|
||||
fmt.Println("ffmpeg: exited")
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package generator_test
|
||||
|
||||
import (
|
||||
"segmenta/pkg/generator"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestFFMPEGGeneratorImplementsGenerator(t *testing.T) {
|
||||
require.Implements(t, (*generator.Generator)(nil), new(generator.FFMPEGGenerator))
|
||||
require.Implements(t, (*generator.Handled)(nil), new(generator.FFMPEGGenerator))
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package generator
|
||||
|
||||
import (
|
||||
"segmenta/pkg/handler"
|
||||
)
|
||||
|
||||
// TODO: define or not?
|
||||
type Generator interface {
|
||||
Generate() error
|
||||
}
|
||||
|
||||
// TODO: define or not?
|
||||
type Handled interface {
|
||||
AddHandler(h handler.Handler)
|
||||
RemoveHandler(h handler.Handler)
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
type AssetType int
|
||||
|
||||
const (
|
||||
Segment AssetType = 0
|
||||
MediaPlaylist
|
||||
MasterPlaylist
|
||||
)
|
||||
|
||||
type Asset struct {
|
||||
Name string
|
||||
Key string
|
||||
Type AssetType
|
||||
ContentType string
|
||||
Body io.Reader
|
||||
}
|
||||
|
||||
type Handler interface {
|
||||
AssetAdded(a Asset) error
|
||||
AssetRemoved(a Asset) error
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
)
|
||||
|
||||
const DefaultAwsRegion = "eu-west-1"
|
||||
|
||||
var (
|
||||
s3Session *session.Session
|
||||
s3Client *s3.S3
|
||||
s3Uploader *s3manager.Uploader
|
||||
)
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
s3Session, err = session.NewSession(&aws.Config{Region: aws.String(DefaultAwsRegion)})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
s3Client = s3.New(s3Session)
|
||||
s3Uploader = s3manager.NewUploader(s3Session)
|
||||
}
|
||||
|
||||
type S3Handler struct {
|
||||
Bucket string
|
||||
ACL string
|
||||
}
|
||||
|
||||
func NewS3Handler(bucket string, acl string) *S3Handler {
|
||||
return &S3Handler{
|
||||
Bucket: bucket,
|
||||
ACL: acl,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *S3Handler) AssetAdded(a Asset) error {
|
||||
result, err := s3Uploader.Upload(&s3manager.UploadInput{
|
||||
Key: aws.String(a.Key),
|
||||
Bucket: aws.String(h.Bucket),
|
||||
ACL: aws.String(h.ACL),
|
||||
Body: a.Body,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Println("Uploaded", a.Name, "to", result.Location)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *S3Handler) AssetRemoved(a Asset) error {
|
||||
// TODO consider cleaning up assets every ~60 seconds?
|
||||
// https://github.com/aws/aws-sdk-go/blob/v1.33.4/service/s3/s3manager/batch.go#L255
|
||||
_, err := s3Client.DeleteObject(&s3.DeleteObjectInput{
|
||||
Key: aws.String(a.Key),
|
||||
Bucket: aws.String(h.Bucket),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package handler_test
|
||||
|
||||
import (
|
||||
"segmenta/pkg/handler"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestS3HandlerImplementsHandler(t *testing.T) {
|
||||
require.Implements(t, (*handler.Handler)(nil), new(handler.S3Handler))
|
||||
}
|
Loading…
Reference in New Issue