Compare commits
No commits in common. "dev" and "refactor" have entirely different histories.
|
@ -1 +0,0 @@
|
||||||
/segmento
|
|
9
go.mod
9
go.mod
|
@ -1,9 +1,10 @@
|
||||||
module segmento
|
module segmenta
|
||||||
|
|
||||||
go 1.14
|
go 1.14
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/aws/aws-sdk-go v1.33.4
|
github.com/aws/aws-sdk-go v1.33.5
|
||||||
github.com/stretchr/testify v1.6.1
|
github.com/stretchr/testify v1.5.1
|
||||||
github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300
|
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
|
||||||
|
gopkg.in/fsnotify.v1 v1.4.7
|
||||||
)
|
)
|
||||||
|
|
18
go.sum
18
go.sum
|
@ -1,5 +1,5 @@
|
||||||
github.com/aws/aws-sdk-go v1.33.4 h1:lhVZe2TkSjJz26jPBCBAvJvAy70Yxxlbm/Ciw1gmyRY=
|
github.com/aws/aws-sdk-go v1.33.5 h1:p2fr1ryvNTU6avUWLI+/H7FGv0TBIjzVM5WDgXBBv4U=
|
||||||
github.com/aws/aws-sdk-go v1.33.4/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
|
github.com/aws/aws-sdk-go v1.33.5/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
|
||||||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
|
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
|
||||||
|
@ -8,19 +8,17 @@ github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeY
|
||||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
|
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
|
||||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||||
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
|
|
||||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
|
||||||
github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300 h1:XQdibLKagjdevRB6vAjVY4qbSr8rQ610YzTkWcxzxSI=
|
|
||||||
github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300/go.mod h1:FNa/dfN95vAYCNFrIKRrlRo+MBLbwmR9Asa5f2ljmBI=
|
|
||||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae h1:Ih9Yo4hSPImZOpfGuA4bR/ORKTAbhZo2AbWNRCnevdo=
|
||||||
|
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
|
||||||
|
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||||
|
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
|
||||||
|
|
46
main.go
46
main.go
|
@ -1,51 +1,27 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
|
||||||
"os"
|
"os"
|
||||||
"segmento/pkg/media"
|
"segmenta/pkg/generator"
|
||||||
"segmento/pkg/playlist"
|
"segmenta/pkg/handler"
|
||||||
"segmento/pkg/s3"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// How to stream a static video file as a "live" stream?
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// TODO accept some flags with:
|
handler := handler.NewS3Handler("rfwatson-hls", "public-read")
|
||||||
// URL - source of stream
|
generator := generator.NewFFMPEGGenerator("test", os.Stdin)
|
||||||
// TargetLength - length of segments in seconds
|
generator.AddHandler(handler)
|
||||||
// Output
|
|
||||||
// -d some_dir/ => output playlist and chunks to this directory, cleaning up old files from time to time.
|
|
||||||
// -b 0.0.0.0:3000 => serve playlist and chunks from an HTTP server bound to this address
|
|
||||||
|
|
||||||
var url string
|
fmt.Println("starting generator...")
|
||||||
flag.StringVar(&url, "url", "", "URL of MP3 stream")
|
|
||||||
|
|
||||||
flag.Parse()
|
err := generator.Generate()
|
||||||
|
|
||||||
if url == "" {
|
|
||||||
log.Println("Invalid arguments")
|
|
||||||
flag.PrintDefaults()
|
|
||||||
os.Exit(-1)
|
|
||||||
}
|
|
||||||
|
|
||||||
client := &http.Client{}
|
|
||||||
resp, err := client.Get(url)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer resp.Body.Close()
|
fmt.Println("completed generator...")
|
||||||
|
|
||||||
segmenter := media.NewMP3Segmenter()
|
|
||||||
publisher := &media.FakePublisher{}
|
|
||||||
playlist := playlist.NewMediaPlaylist(resp.Body, segmenter, publisher)
|
|
||||||
playlist.AddConsumer(&s3.Consumer{})
|
|
||||||
|
|
||||||
if err = playlist.Run(); err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Println("exiting")
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,152 @@
|
||||||
|
package generator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"segmenta/pkg/handler"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"gopkg.in/fsnotify.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
const AppName = "segmenta"
|
||||||
|
const DefaultPlaylistFilename = "playlist.m3u8"
|
||||||
|
|
||||||
|
// Generator generates assets for a given stream. The default implementation
|
||||||
|
// segments using FFMPEG.
|
||||||
|
type FFMPEGGenerator struct {
|
||||||
|
name string
|
||||||
|
src io.Reader
|
||||||
|
handlers map[handler.Handler]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFFMPEGGenerator(name string, src io.Reader) *FFMPEGGenerator {
|
||||||
|
return &FFMPEGGenerator{
|
||||||
|
name: name,
|
||||||
|
src: src,
|
||||||
|
handlers: make(map[handler.Handler]bool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *FFMPEGGenerator) Generate() error {
|
||||||
|
// first, create a temporary directory.
|
||||||
|
prefix := fmt.Sprintf("%s-%s", AppName, g.name)
|
||||||
|
tmpdir, err := ioutil.TempDir("", prefix)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("ffmpeg: error initializing TempDir: %v", tmpdir)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmpdir)
|
||||||
|
|
||||||
|
playlist := filepath.Join(tmpdir, DefaultPlaylistFilename)
|
||||||
|
|
||||||
|
// FIXME: Better concurrency management is needed here. We should expect doMMPEG to run
|
||||||
|
// indefinitely, and its eventual closure (successful or otherwise) to trigger closure
|
||||||
|
// of this main process, including other goroutines.
|
||||||
|
// one instance of doWatch should be running at all times until the FFMPEG goroutine is
|
||||||
|
// closed. If it is interrupted for some reason before doFFMPEG has terminated, then
|
||||||
|
// this is a bug. It should be restarted.
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
go g.doFFMPEG(&wg, playlist)
|
||||||
|
go g.doWatch(&wg, tmpdir)
|
||||||
|
|
||||||
|
wg.Add(2)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *FFMPEGGenerator) AddHandler(h handler.Handler) {
|
||||||
|
g.handlers[h] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *FFMPEGGenerator) RemoveHandler(h handler.Handler) {
|
||||||
|
delete(g.handlers, h)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO should wg be passed in a Context?
|
||||||
|
func (g *FFMPEGGenerator) doFFMPEG(wg *sync.WaitGroup, dest string) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
cmd := exec.Command("ffmpeg", "-re", "-i", "-", "-f", "hls", "-hls_flags", "delete_segments", dest)
|
||||||
|
cmd.Stdin = g.src
|
||||||
|
|
||||||
|
if err := cmd.Run(); err != nil {
|
||||||
|
// TODO: error handling
|
||||||
|
log.Fatalf("ffmpeg: error running cmd: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("ffmpeg: exited")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *FFMPEGGenerator) doWatch(wg *sync.WaitGroup, srcDir string) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
watcher, err := fsnotify.NewWatcher()
|
||||||
|
if err != nil {
|
||||||
|
// TODO: error handling
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
defer watcher.Close()
|
||||||
|
|
||||||
|
err = watcher.Add(srcDir)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case event, ok := <-watcher.Events:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key := filepath.Join(g.name, filepath.Base(event.Name))
|
||||||
|
if event.Op&fsnotify.Create == fsnotify.Create && strings.HasSuffix(event.Name, ".m3u8") {
|
||||||
|
asset := handler.Asset{
|
||||||
|
Key: key,
|
||||||
|
Type: handler.MediaPlaylist,
|
||||||
|
ContentType: "vnd.apple.mpegURL",
|
||||||
|
Path: event.Name,
|
||||||
|
}
|
||||||
|
for h, _ := range g.handlers {
|
||||||
|
h.AssetAdded(asset)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if event.Op&fsnotify.Write == fsnotify.Write && strings.HasSuffix(event.Name, ".ts") {
|
||||||
|
asset := handler.Asset{
|
||||||
|
Key: key,
|
||||||
|
Type: handler.Segment,
|
||||||
|
ContentType: "video/MP2T",
|
||||||
|
Path: event.Name,
|
||||||
|
}
|
||||||
|
for h, _ := range g.handlers {
|
||||||
|
h.AssetAdded(asset)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if event.Op&fsnotify.Remove == fsnotify.Remove && strings.HasSuffix(event.Name, ".ts") {
|
||||||
|
asset := handler.Asset{
|
||||||
|
Key: key,
|
||||||
|
Type: handler.Segment,
|
||||||
|
ContentType: "video/MP2T",
|
||||||
|
Path: event.Name,
|
||||||
|
}
|
||||||
|
for h, _ := range g.handlers {
|
||||||
|
h.AssetRemoved(asset)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case err, ok := <-watcher.Errors:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Println("Got error:", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("watch: exited")
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
package generator_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"segmenta/pkg/generator"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFFMPEGGeneratorImplementsGenerator(t *testing.T) {
|
||||||
|
require.Implements(t, (*generator.Generator)(nil), new(generator.FFMPEGGenerator))
|
||||||
|
require.Implements(t, (*generator.Handled)(nil), new(generator.FFMPEGGenerator))
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
package generator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"segmenta/pkg/handler"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Generator implements behaviour for generating segments from some incoming media stream.
|
||||||
|
// TODO: possibly rename to Segmenter?
|
||||||
|
// TODO: define or not?
|
||||||
|
type Generator interface {
|
||||||
|
Generate() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: define or not?
|
||||||
|
type Handled interface {
|
||||||
|
AddHandler(h handler.Handler)
|
||||||
|
RemoveHandler(h handler.Handler)
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
package handler
|
||||||
|
|
||||||
|
type AssetType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
Segment AssetType = 0
|
||||||
|
MediaPlaylist
|
||||||
|
MasterPlaylist
|
||||||
|
)
|
||||||
|
|
||||||
|
type Asset struct {
|
||||||
|
Path string
|
||||||
|
Key string // should this be passed with the asset? or decided upon by the handler?
|
||||||
|
Type AssetType
|
||||||
|
ContentType string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Handler interface {
|
||||||
|
AssetAdded(a Asset) error
|
||||||
|
|
||||||
|
// TODO it is probably better to remove this because it will be difficult to ensure
|
||||||
|
// that all old assets are always cleaned up (it would be easy for this callback to be missed,
|
||||||
|
// for example). Better for implementations to be responsible for cleaning up old files, either
|
||||||
|
// using an implementation-specific method (e.g. S3 expiring objects) or just a periodic check.
|
||||||
|
AssetRemoved(a Asset) error
|
||||||
|
}
|
|
@ -0,0 +1,81 @@
|
||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||||
|
)
|
||||||
|
|
||||||
|
const DefaultAwsRegion = "eu-west-1"
|
||||||
|
|
||||||
|
var (
|
||||||
|
s3Session *session.Session
|
||||||
|
s3Client *s3.S3
|
||||||
|
s3Uploader *s3manager.Uploader
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
var err error
|
||||||
|
s3Session, err = session.NewSession(&aws.Config{Region: aws.String(DefaultAwsRegion)})
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s3Client = s3.New(s3Session)
|
||||||
|
s3Uploader = s3manager.NewUploader(s3Session)
|
||||||
|
}
|
||||||
|
|
||||||
|
type S3Handler struct {
|
||||||
|
Bucket string
|
||||||
|
ACL string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewS3Handler(bucket string, acl string) *S3Handler {
|
||||||
|
return &S3Handler{
|
||||||
|
Bucket: bucket,
|
||||||
|
ACL: acl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *S3Handler) AssetAdded(a Asset) error {
|
||||||
|
f, err := os.Open(a.Path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
result, err := s3Uploader.Upload(&s3manager.UploadInput{
|
||||||
|
Key: aws.String(a.Key),
|
||||||
|
Bucket: aws.String(h.Bucket),
|
||||||
|
ACL: aws.String(h.ACL),
|
||||||
|
Body: f,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("Uploaded", a.Path, "to", result.Location)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *S3Handler) AssetRemoved(a Asset) error {
|
||||||
|
// TODO consider cleaning up assets every ~60 seconds?
|
||||||
|
// https://github.com/aws/aws-sdk-go/blob/v1.33.4/service/s3/s3manager/batch.go#L255
|
||||||
|
_, err := s3Client.DeleteObject(&s3.DeleteObjectInput{
|
||||||
|
Key: aws.String(a.Key),
|
||||||
|
Bucket: aws.String(h.Bucket),
|
||||||
|
})
|
||||||
|
|
||||||
|
fmt.Println("Deleted", a.Key)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
package handler_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"segmenta/pkg/handler"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestS3HandlerImplementsHandler(t *testing.T) {
|
||||||
|
require.Implements(t, (*handler.Handler)(nil), new(handler.S3Handler))
|
||||||
|
}
|
|
@ -1,127 +0,0 @@
|
||||||
// Package media contains logic related to parsing and segmenting media
|
|
||||||
// streams, such as MP3 streams.
|
|
||||||
//
|
|
||||||
// It is depended upon by the playlist package.
|
|
||||||
package media
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"io"
|
|
||||||
"log"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/tcolgate/mp3"
|
|
||||||
)
|
|
||||||
|
|
||||||
const DefaultTargetDuration = 3 * time.Second
|
|
||||||
|
|
||||||
type Segmenter interface {
|
|
||||||
Segment(r io.Reader) (chan *Segment, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type SegmentPublisher interface {
|
|
||||||
Publish(s *Segment) (string, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Segment struct {
|
|
||||||
duration time.Duration
|
|
||||||
targetDuration time.Duration
|
|
||||||
data *bytes.Buffer
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize a new Segment struct. The capacity is the initial maximum capacity of the internal
|
|
||||||
// buffer, in bytes. It should be initialized with a value greater than the expected maximum buffer size,
|
|
||||||
// depending on the implementation.
|
|
||||||
func NewSegment(targetDuration time.Duration, capacity int) *Segment {
|
|
||||||
return &Segment{
|
|
||||||
data: bytes.NewBuffer(make([]byte, 0, capacity)),
|
|
||||||
duration: 0,
|
|
||||||
targetDuration: targetDuration,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Segment) ReadFrom(r io.Reader) (n int64, err error) {
|
|
||||||
return s.data.ReadFrom(r)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Segment) IncrementDuration(d time.Duration) time.Duration {
|
|
||||||
s.duration += d
|
|
||||||
return s.duration
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Segment) CanWrite(d time.Duration) bool {
|
|
||||||
return s.targetDuration-s.duration >= d
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Segment) Duration() time.Duration {
|
|
||||||
return s.duration
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Segment) Len() int {
|
|
||||||
return s.data.Len()
|
|
||||||
}
|
|
||||||
|
|
||||||
type MP3Segmenter struct {
|
|
||||||
decoder *mp3.Decoder
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *MP3Segmenter) Segment(r io.Reader) (chan *Segment, error) {
|
|
||||||
c := make(chan *Segment)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
d := mp3.NewDecoder(r)
|
|
||||||
|
|
||||||
var (
|
|
||||||
v mp3.Frame
|
|
||||||
skipped int
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
s *Segment
|
|
||||||
)
|
|
||||||
|
|
||||||
for {
|
|
||||||
if err := d.Decode(&v, &skipped); err != nil {
|
|
||||||
if err == io.EOF {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if s != nil && !s.CanWrite(v.Duration()) {
|
|
||||||
// publish the current segment, and initialize a new one.
|
|
||||||
c <- s
|
|
||||||
s = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if s == nil {
|
|
||||||
// TODO what is a good initial capacity?
|
|
||||||
s = NewSegment(DefaultTargetDuration, 1024)
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err := s.ReadFrom(v.Reader())
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err) // TODO: some proper error handling
|
|
||||||
}
|
|
||||||
|
|
||||||
if n != int64(v.Size()) {
|
|
||||||
// TODO would this ever happen? What should IncrementDuration be called with?
|
|
||||||
log.Fatal("unexpeced frame size, want = ", v.Size(), "got = ", n)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.IncrementDuration(v.Duration())
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return c, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMP3Segmenter() *MP3Segmenter {
|
|
||||||
return &MP3Segmenter{}
|
|
||||||
}
|
|
||||||
|
|
||||||
type FakePublisher struct{}
|
|
||||||
|
|
||||||
func (p *FakePublisher) Publish(s *Segment) (string, error) {
|
|
||||||
return "https://www.example.com/segment.mp3", nil
|
|
||||||
}
|
|
|
@ -1,27 +0,0 @@
|
||||||
package media_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"segmento/pkg/media"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestSegmenterImplements(t *testing.T) {
|
|
||||||
require.Implements(t, (*media.Segmenter)(nil), new(media.MP3Segmenter))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSegment(t *testing.T) {
|
|
||||||
segment := media.NewSegment(10*time.Second, 0)
|
|
||||||
|
|
||||||
require.Equal(t, time.Duration(0), segment.Duration())
|
|
||||||
require.True(t, segment.CanWrite(9*time.Second))
|
|
||||||
require.True(t, segment.CanWrite(10*time.Second))
|
|
||||||
require.False(t, segment.CanWrite(11*time.Second))
|
|
||||||
|
|
||||||
d := segment.IncrementDuration(10 * time.Second)
|
|
||||||
require.Equal(t, segment.Duration(), d)
|
|
||||||
require.Equal(t, 10*time.Second, segment.Duration())
|
|
||||||
require.False(t, segment.CanWrite(1*time.Millisecond))
|
|
||||||
}
|
|
|
@ -1,128 +0,0 @@
|
||||||
package playlist
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"segmento/pkg/media"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
const DefaultMaxSegments = 10
|
|
||||||
|
|
||||||
type PlaylistSegment struct {
|
|
||||||
media.Segment
|
|
||||||
url string
|
|
||||||
seqId int
|
|
||||||
}
|
|
||||||
|
|
||||||
type Playlist interface {
|
|
||||||
Len() int
|
|
||||||
AddConsumer(c Consumer)
|
|
||||||
RemoveConsumer(c Consumer) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type Consumer interface {
|
|
||||||
PlaylistUpdated(p Playlist)
|
|
||||||
PlaylistSegmentAdded(p Playlist, s *PlaylistSegment)
|
|
||||||
}
|
|
||||||
|
|
||||||
// so we know that we can publish a segment (i.e. make available a URL to access it from elsewhere)
|
|
||||||
// but what does it mean to publish a Playlist?
|
|
||||||
// A playlist contains lists of Segments but it wouldn't necessarily be published in the same location
|
|
||||||
// for example, Segments may be published to S3 but a playlist may be published periodically to a static
|
|
||||||
// hosting location elsewhere.
|
|
||||||
|
|
||||||
type MediaPlaylist struct {
|
|
||||||
nextSeqId int
|
|
||||||
src io.Reader // read a stream of bytes e.g. MP3
|
|
||||||
segmenter media.Segmenter // segment the incoming bytes. For now, up to the caller to provider a matching src and segmenter.
|
|
||||||
publisher media.SegmentPublisher // publish the segments somewhere (i.e. make available a URL with them)
|
|
||||||
segments []*PlaylistSegment // a slice of the last n segments
|
|
||||||
consumers map[Consumer]bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO does it make sense to do 50% dependency injection and 50% consumers here?
|
|
||||||
// Why not just pass everything through the consumer?
|
|
||||||
// Or how about define these methods on the interface and force implementations to provide them?
|
|
||||||
func NewMediaPlaylist(src io.Reader, segmenter media.Segmenter, publisher media.SegmentPublisher) *MediaPlaylist {
|
|
||||||
p := MediaPlaylist{
|
|
||||||
src: src,
|
|
||||||
segmenter: segmenter,
|
|
||||||
publisher: publisher,
|
|
||||||
segments: make([]*PlaylistSegment, 0, 10),
|
|
||||||
consumers: make(map[Consumer]bool),
|
|
||||||
}
|
|
||||||
return &p
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *MediaPlaylist) Len() int {
|
|
||||||
return len(p.segments)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *MediaPlaylist) AddConsumer(c Consumer) {
|
|
||||||
p.consumers[c] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *MediaPlaylist) RemoveConsumer(c Consumer) error {
|
|
||||||
delete(p.consumers, c)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *MediaPlaylist) Run() error {
|
|
||||||
segments, err := p.segmenter.Segment(p.src)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for s := range segments {
|
|
||||||
if err = p.handleSegment(s); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *MediaPlaylist) handleSegment(s *media.Segment) error {
|
|
||||||
// first, publish the segment:
|
|
||||||
url, err := p.publisher.Publish(s)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// initialize a new playlist segment:
|
|
||||||
nextSeqId := p.nextSeqId
|
|
||||||
p.nextSeqId++
|
|
||||||
ps := PlaylistSegment{
|
|
||||||
Segment: *s, // TODO make the Segmenter publish values, not pointers
|
|
||||||
seqId: nextSeqId,
|
|
||||||
url: url,
|
|
||||||
}
|
|
||||||
|
|
||||||
// append the playlist segment to our slice of segments:
|
|
||||||
p.segments = append(p.segments, &ps)
|
|
||||||
|
|
||||||
// trim the start of the playlist if needed:
|
|
||||||
if len(p.segments) > DefaultMaxSegments {
|
|
||||||
p.segments = p.segments[len(p.segments)-DefaultMaxSegments:]
|
|
||||||
}
|
|
||||||
|
|
||||||
for c, _ := range p.consumers {
|
|
||||||
c.PlaylistSegmentAdded(p, &ps)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *MediaPlaylist) Render() string {
|
|
||||||
var r string
|
|
||||||
r += "#EXTM3U\n"
|
|
||||||
r += "#EXT-X-VERSION:3\n"
|
|
||||||
r += "#EXT-X-TARGETDURATION:3\n" // TODO
|
|
||||||
for _, s := range p.segments {
|
|
||||||
r += fmt.Sprintf("#EXTINF:%.05f\n", float32(s.Duration())/float32(time.Second))
|
|
||||||
r += "http://www.example.com/x.mp3\n"
|
|
||||||
}
|
|
||||||
r += "#EXT-X-ENDLIST"
|
|
||||||
return r
|
|
||||||
}
|
|
|
@ -1,102 +0,0 @@
|
||||||
package playlist_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
"segmento/pkg/media"
|
|
||||||
"segmento/pkg/playlist"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
type FakeReader struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *FakeReader) Read([]byte) (int, error) {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type FakeSegmenter struct {
|
|
||||||
count int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *FakeSegmenter) Segment(r io.Reader) (chan *media.Segment, error) {
|
|
||||||
c := make(chan *media.Segment)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
dur := 2500 * time.Millisecond
|
|
||||||
for i := 0; i < s.count; i++ {
|
|
||||||
segment := media.NewSegment(dur, 0)
|
|
||||||
segment.IncrementDuration(dur)
|
|
||||||
c <- segment
|
|
||||||
}
|
|
||||||
close(c)
|
|
||||||
}()
|
|
||||||
|
|
||||||
return c, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type FakeSegmentPublisher struct {
|
|
||||||
count int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *FakeSegmentPublisher) Publish(s *media.Segment) (string, error) {
|
|
||||||
p.count++
|
|
||||||
return "", nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type consumer struct {
|
|
||||||
pCount, sCount int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *consumer) PlaylistUpdated(p playlist.Playlist) {
|
|
||||||
c.pCount++
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *consumer) PlaylistSegmentAdded(p playlist.Playlist, s *playlist.PlaylistSegment) {
|
|
||||||
c.sCount++
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMediaPlaylistImplements(t *testing.T) {
|
|
||||||
require.Implements(t, (*playlist.Playlist)(nil), new(playlist.MediaPlaylist))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMediaPlaylist(t *testing.T) {
|
|
||||||
publisher := &FakeSegmentPublisher{}
|
|
||||||
playlist := playlist.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{3}, publisher)
|
|
||||||
|
|
||||||
err := playlist.Run()
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 3, playlist.Len())
|
|
||||||
require.Equal(t, 3, publisher.count)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMediaPlaylistRender(t *testing.T) {
|
|
||||||
playlist := playlist.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{2}, &FakeSegmentPublisher{})
|
|
||||||
err := playlist.Run()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
lines := strings.Split(playlist.Render(), "\n")
|
|
||||||
|
|
||||||
require.Equal(t, "#EXTM3U", lines[0])
|
|
||||||
require.Equal(t, "#EXT-X-VERSION:3", lines[1])
|
|
||||||
require.Equal(t, "#EXT-X-TARGETDURATION:3", lines[2])
|
|
||||||
require.Equal(t, "#EXTINF:2.50000", lines[3])
|
|
||||||
require.Equal(t, "http://www.example.com/x.mp3", lines[4])
|
|
||||||
require.Equal(t, "#EXTINF:2.50000", lines[5])
|
|
||||||
require.Equal(t, "http://www.example.com/x.mp3", lines[6])
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMediaPlaylistConsumer(t *testing.T) {
|
|
||||||
consumer := &consumer{}
|
|
||||||
require.Implements(t, (*playlist.Consumer)(nil), consumer)
|
|
||||||
|
|
||||||
playlist := playlist.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{4}, &FakeSegmentPublisher{})
|
|
||||||
playlist.AddConsumer(consumer)
|
|
||||||
err := playlist.Run()
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 4, consumer.sCount)
|
|
||||||
require.Equal(t, 0, consumer.pCount) // TODO
|
|
||||||
}
|
|
37
pkg/s3/s3.go
37
pkg/s3/s3.go
|
@ -1,37 +0,0 @@
|
||||||
package s3
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"segmento/pkg/playlist"
|
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
|
||||||
)
|
|
||||||
|
|
||||||
const DefaultAwsRegion = "eu-west-1"
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
sess, err := session.NewSession(&aws.Config{Region: aws.String(DefaultAwsRegion)})
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type Consumer struct {
|
|
||||||
S3Bucket string
|
|
||||||
S3PathPrefix string
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewConsumer(bucket string, pathPrefix string) *Consumer {
|
|
||||||
c := Consumer{bucket, pathPrefix}
|
|
||||||
return &c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Consumer) PlaylistUpdated(p playlist.Playlist) {
|
|
||||||
fmt.Println("s3: PlaylistUpdated")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Consumer) PlaylistSegmentAdded(p playlist.Playlist, s *playlist.PlaylistSegment) {
|
|
||||||
fmt.Println("s3: PlaylistSegmentAdded")
|
|
||||||
}
|
|
|
@ -0,0 +1 @@
|
||||||
|
package web
|
Loading…
Reference in New Issue