diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1f91e5b --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/segmento diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..64c1c5f --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module segmento + +go 1.14 + +require ( + github.com/aws/aws-sdk-go v1.33.4 + github.com/stretchr/testify v1.6.1 + github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9e95fd7 --- /dev/null +++ b/go.sum @@ -0,0 +1,26 @@ +github.com/aws/aws-sdk-go v1.33.4 h1:lhVZe2TkSjJz26jPBCBAvJvAy70Yxxlbm/Ciw1gmyRY= +github.com/aws/aws-sdk-go v1.33.4/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= +github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300 h1:XQdibLKagjdevRB6vAjVY4qbSr8rQ610YzTkWcxzxSI= +github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300/go.mod h1:FNa/dfN95vAYCNFrIKRrlRo+MBLbwmR9Asa5f2ljmBI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..57191ca --- /dev/null +++ b/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "flag" + "log" + "net/http" + "os" + "segmento/pkg/media" + "segmento/pkg/playlist" + "segmento/pkg/s3" +) + +func main() { + // TODO accept some flags with: + // URL - source of stream + // TargetLength - length of segments in seconds + // Output + // -d some_dir/ => output playlist and chunks to this directory, cleaning up old files from time to time. + // -b 0.0.0.0:3000 => serve playlist and chunks from an HTTP server bound to this address + + var url string + flag.StringVar(&url, "url", "", "URL of MP3 stream") + + flag.Parse() + + if url == "" { + log.Println("Invalid arguments") + flag.PrintDefaults() + os.Exit(-1) + } + + client := &http.Client{} + resp, err := client.Get(url) + + if err != nil { + log.Fatal(err) + } + + defer resp.Body.Close() + + segmenter := media.NewMP3Segmenter() + publisher := &media.FakePublisher{} + playlist := playlist.NewMediaPlaylist(resp.Body, segmenter, publisher) + playlist.AddConsumer(&s3.Consumer{}) + + if err = playlist.Run(); err != nil { + log.Fatal(err) + } + + log.Println("exiting") +} diff --git a/pkg/media/media.go b/pkg/media/media.go new file mode 100644 index 0000000..1040e13 --- /dev/null +++ b/pkg/media/media.go @@ -0,0 +1,127 @@ +// Package media contains logic related to parsing and segmenting media +// streams, such as MP3 streams. +// +// It is depended upon by the playlist package. +package media + +import ( + "bytes" + "io" + "log" + "time" + + "github.com/tcolgate/mp3" +) + +const DefaultTargetDuration = 3 * time.Second + +type Segmenter interface { + Segment(r io.Reader) (chan *Segment, error) +} + +type SegmentPublisher interface { + Publish(s *Segment) (string, error) +} + +type Segment struct { + duration time.Duration + targetDuration time.Duration + data *bytes.Buffer +} + +// Initialize a new Segment struct. The capacity is the initial maximum capacity of the internal +// buffer, in bytes. It should be initialized with a value greater than the expected maximum buffer size, +// depending on the implementation. +func NewSegment(targetDuration time.Duration, capacity int) *Segment { + return &Segment{ + data: bytes.NewBuffer(make([]byte, 0, capacity)), + duration: 0, + targetDuration: targetDuration, + } +} + +func (s *Segment) ReadFrom(r io.Reader) (n int64, err error) { + return s.data.ReadFrom(r) +} + +func (s *Segment) IncrementDuration(d time.Duration) time.Duration { + s.duration += d + return s.duration +} + +func (s *Segment) CanWrite(d time.Duration) bool { + return s.targetDuration-s.duration >= d +} + +func (s *Segment) Duration() time.Duration { + return s.duration +} + +func (s *Segment) Len() int { + return s.data.Len() +} + +type MP3Segmenter struct { + decoder *mp3.Decoder +} + +func (s *MP3Segmenter) Segment(r io.Reader) (chan *Segment, error) { + c := make(chan *Segment) + + go func() { + d := mp3.NewDecoder(r) + + var ( + v mp3.Frame + skipped int + ) + + var ( + s *Segment + ) + + for { + if err := d.Decode(&v, &skipped); err != nil { + if err == io.EOF { + break + } + log.Fatal(err) + } + + if s != nil && !s.CanWrite(v.Duration()) { + // publish the current segment, and initialize a new one. + c <- s + s = nil + } + + if s == nil { + // TODO what is a good initial capacity? + s = NewSegment(DefaultTargetDuration, 1024) + } + + n, err := s.ReadFrom(v.Reader()) + if err != nil { + log.Fatal(err) // TODO: some proper error handling + } + + if n != int64(v.Size()) { + // TODO would this ever happen? What should IncrementDuration be called with? + log.Fatal("unexpeced frame size, want = ", v.Size(), "got = ", n) + } + + s.IncrementDuration(v.Duration()) + } + }() + + return c, nil +} + +func NewMP3Segmenter() *MP3Segmenter { + return &MP3Segmenter{} +} + +type FakePublisher struct{} + +func (p *FakePublisher) Publish(s *Segment) (string, error) { + return "https://www.example.com/segment.mp3", nil +} diff --git a/pkg/media/media_test.go b/pkg/media/media_test.go new file mode 100644 index 0000000..0146356 --- /dev/null +++ b/pkg/media/media_test.go @@ -0,0 +1,27 @@ +package media_test + +import ( + "segmento/pkg/media" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSegmenterImplements(t *testing.T) { + require.Implements(t, (*media.Segmenter)(nil), new(media.MP3Segmenter)) +} + +func TestSegment(t *testing.T) { + segment := media.NewSegment(10*time.Second, 0) + + require.Equal(t, time.Duration(0), segment.Duration()) + require.True(t, segment.CanWrite(9*time.Second)) + require.True(t, segment.CanWrite(10*time.Second)) + require.False(t, segment.CanWrite(11*time.Second)) + + d := segment.IncrementDuration(10 * time.Second) + require.Equal(t, segment.Duration(), d) + require.Equal(t, 10*time.Second, segment.Duration()) + require.False(t, segment.CanWrite(1*time.Millisecond)) +} diff --git a/pkg/playlist/playlist.go b/pkg/playlist/playlist.go new file mode 100644 index 0000000..82cef5b --- /dev/null +++ b/pkg/playlist/playlist.go @@ -0,0 +1,128 @@ +package playlist + +import ( + "fmt" + "io" + "segmento/pkg/media" + "time" +) + +const DefaultMaxSegments = 10 + +type PlaylistSegment struct { + media.Segment + url string + seqId int +} + +type Playlist interface { + Len() int + AddConsumer(c Consumer) + RemoveConsumer(c Consumer) error +} + +type Consumer interface { + PlaylistUpdated(p Playlist) + PlaylistSegmentAdded(p Playlist, s *PlaylistSegment) +} + +// so we know that we can publish a segment (i.e. make available a URL to access it from elsewhere) +// but what does it mean to publish a Playlist? +// A playlist contains lists of Segments but it wouldn't necessarily be published in the same location +// for example, Segments may be published to S3 but a playlist may be published periodically to a static +// hosting location elsewhere. + +type MediaPlaylist struct { + nextSeqId int + src io.Reader // read a stream of bytes e.g. MP3 + segmenter media.Segmenter // segment the incoming bytes. For now, up to the caller to provider a matching src and segmenter. + publisher media.SegmentPublisher // publish the segments somewhere (i.e. make available a URL with them) + segments []*PlaylistSegment // a slice of the last n segments + consumers map[Consumer]bool +} + +// TODO does it make sense to do 50% dependency injection and 50% consumers here? +// Why not just pass everything through the consumer? +// Or how about define these methods on the interface and force implementations to provide them? +func NewMediaPlaylist(src io.Reader, segmenter media.Segmenter, publisher media.SegmentPublisher) *MediaPlaylist { + p := MediaPlaylist{ + src: src, + segmenter: segmenter, + publisher: publisher, + segments: make([]*PlaylistSegment, 0, 10), + consumers: make(map[Consumer]bool), + } + return &p +} + +func (p *MediaPlaylist) Len() int { + return len(p.segments) +} + +func (p *MediaPlaylist) AddConsumer(c Consumer) { + p.consumers[c] = true +} + +func (p *MediaPlaylist) RemoveConsumer(c Consumer) error { + delete(p.consumers, c) + return nil +} + +func (p *MediaPlaylist) Run() error { + segments, err := p.segmenter.Segment(p.src) + if err != nil { + return err + } + + for s := range segments { + if err = p.handleSegment(s); err != nil { + return err + } + } + + return nil +} + +func (p *MediaPlaylist) handleSegment(s *media.Segment) error { + // first, publish the segment: + url, err := p.publisher.Publish(s) + if err != nil { + return err + } + + // initialize a new playlist segment: + nextSeqId := p.nextSeqId + p.nextSeqId++ + ps := PlaylistSegment{ + Segment: *s, // TODO make the Segmenter publish values, not pointers + seqId: nextSeqId, + url: url, + } + + // append the playlist segment to our slice of segments: + p.segments = append(p.segments, &ps) + + // trim the start of the playlist if needed: + if len(p.segments) > DefaultMaxSegments { + p.segments = p.segments[len(p.segments)-DefaultMaxSegments:] + } + + for c, _ := range p.consumers { + c.PlaylistSegmentAdded(p, &ps) + } + + return nil +} + +func (p *MediaPlaylist) Render() string { + var r string + r += "#EXTM3U\n" + r += "#EXT-X-VERSION:3\n" + r += "#EXT-X-TARGETDURATION:3\n" // TODO + for _, s := range p.segments { + r += fmt.Sprintf("#EXTINF:%.05f\n", float32(s.Duration())/float32(time.Second)) + r += "http://www.example.com/x.mp3\n" + } + r += "#EXT-X-ENDLIST" + return r +} diff --git a/pkg/playlist/playlist_test.go b/pkg/playlist/playlist_test.go new file mode 100644 index 0000000..99aa6d8 --- /dev/null +++ b/pkg/playlist/playlist_test.go @@ -0,0 +1,102 @@ +package playlist_test + +import ( + "io" + "segmento/pkg/media" + "segmento/pkg/playlist" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type FakeReader struct { +} + +func (r *FakeReader) Read([]byte) (int, error) { + return 0, nil +} + +type FakeSegmenter struct { + count int +} + +func (s *FakeSegmenter) Segment(r io.Reader) (chan *media.Segment, error) { + c := make(chan *media.Segment) + + go func() { + dur := 2500 * time.Millisecond + for i := 0; i < s.count; i++ { + segment := media.NewSegment(dur, 0) + segment.IncrementDuration(dur) + c <- segment + } + close(c) + }() + + return c, nil +} + +type FakeSegmentPublisher struct { + count int +} + +func (p *FakeSegmentPublisher) Publish(s *media.Segment) (string, error) { + p.count++ + return "", nil +} + +type consumer struct { + pCount, sCount int +} + +func (c *consumer) PlaylistUpdated(p playlist.Playlist) { + c.pCount++ +} + +func (c *consumer) PlaylistSegmentAdded(p playlist.Playlist, s *playlist.PlaylistSegment) { + c.sCount++ +} + +func TestMediaPlaylistImplements(t *testing.T) { + require.Implements(t, (*playlist.Playlist)(nil), new(playlist.MediaPlaylist)) +} + +func TestMediaPlaylist(t *testing.T) { + publisher := &FakeSegmentPublisher{} + playlist := playlist.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{3}, publisher) + + err := playlist.Run() + require.NoError(t, err) + require.Equal(t, 3, playlist.Len()) + require.Equal(t, 3, publisher.count) +} + +func TestMediaPlaylistRender(t *testing.T) { + playlist := playlist.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{2}, &FakeSegmentPublisher{}) + err := playlist.Run() + require.NoError(t, err) + + lines := strings.Split(playlist.Render(), "\n") + + require.Equal(t, "#EXTM3U", lines[0]) + require.Equal(t, "#EXT-X-VERSION:3", lines[1]) + require.Equal(t, "#EXT-X-TARGETDURATION:3", lines[2]) + require.Equal(t, "#EXTINF:2.50000", lines[3]) + require.Equal(t, "http://www.example.com/x.mp3", lines[4]) + require.Equal(t, "#EXTINF:2.50000", lines[5]) + require.Equal(t, "http://www.example.com/x.mp3", lines[6]) +} + +func TestMediaPlaylistConsumer(t *testing.T) { + consumer := &consumer{} + require.Implements(t, (*playlist.Consumer)(nil), consumer) + + playlist := playlist.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{4}, &FakeSegmentPublisher{}) + playlist.AddConsumer(consumer) + err := playlist.Run() + require.NoError(t, err) + require.Equal(t, 4, consumer.sCount) + require.Equal(t, 0, consumer.pCount) // TODO +} diff --git a/pkg/s3/s3.go b/pkg/s3/s3.go new file mode 100644 index 0000000..92b1fec --- /dev/null +++ b/pkg/s3/s3.go @@ -0,0 +1,37 @@ +package s3 + +import ( + "fmt" + "log" + "segmento/pkg/playlist" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" +) + +const DefaultAwsRegion = "eu-west-1" + +func init() { + sess, err := session.NewSession(&aws.Config{Region: aws.String(DefaultAwsRegion)}) + if err != nil { + log.Fatal(err) + } +} + +type Consumer struct { + S3Bucket string + S3PathPrefix string +} + +func NewConsumer(bucket string, pathPrefix string) *Consumer { + c := Consumer{bucket, pathPrefix} + return &c +} + +func (c *Consumer) PlaylistUpdated(p playlist.Playlist) { + fmt.Println("s3: PlaylistUpdated") +} + +func (c *Consumer) PlaylistSegmentAdded(p playlist.Playlist, s *playlist.PlaylistSegment) { + fmt.Println("s3: PlaylistSegmentAdded") +}