From 689929b27a836ab02fc6a20fb8131929260bdca3 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Sun, 5 Jul 2020 08:15:58 +0200 Subject: [PATCH 01/12] WIP: hacking --- .gitignore | 1 + main.go | 120 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+) create mode 100644 .gitignore create mode 100644 main.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1f91e5b --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/segmento diff --git a/main.go b/main.go new file mode 100644 index 0000000..94895b3 --- /dev/null +++ b/main.go @@ -0,0 +1,120 @@ +package main + +import ( + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "time" + + "github.com/tcolgate/mp3" +) + +// TODO what should be the behaviour when adding a Frame to a Segment causes it to over-shoot +// the TargetDuration? +// should the frame be partially added, or not added at all? +// typical frame duration 36ms +type Segment struct { + TargetDuration time.Duration + Duration time.Duration + Data []byte +} + +func newSegment(capacity int) *Segment { + return &Segment{ + Data: make([]byte, 0, capacity), + } +} + +func (s *Segment) Write(p []byte) (n int, err error) { + fmt.Println("write", len(p), "bytes") + return len(p), nil +} + +type MP3HTTPSegmenter struct { + decoder *mp3.Decoder +} + +func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan Segment, error) { + c := make(chan Segment) + + go func() { + d := mp3.NewDecoder(r) + + var ( + v mp3.Frame + skipped int + ) + + for { + if err := d.Decode(&v, &skipped); err != nil { + log.Fatal(err) + } + + fmt.Println("decoded frame of", v.Size(), "bytes, duration", v.Duration(), ", skipped", skipped, "bytes") + } + }() + + return c, nil +} + +func newMP3HTTPSegmenter() *MP3HTTPSegmenter { + return &MP3HTTPSegmenter{} +} + +// TODO + +// A Segmenter class which allows the passing in of a Reader +// i.e. +// Segment(r io.Reader) (chan *Segment, error) +// This will create an mp3.NewDecoder(r) and store the returned decoder. +// As the Reader is read, the Segmenter will publish a stream of SegmentEvents + +func main() { + // TODO accept some flags with: + // URL - source of stream + // TargetLength - length of segments in seconds + // Output + // -d some_dir/ => output playlist and chunks to this directory, cleaning up old files from time to time. + // -b 0.0.0.0:3000 => serve playlist and chunks from an HTTP server bound to this address + + var url string + flag.StringVar(&url, "url", "", "URL of MP3 stream") + + flag.Parse() + + if url == "" { + log.Println("Invalid arguments") + flag.PrintDefaults() + os.Exit(-1) + } + + log.Println("Open URL:", url) + + client := &http.Client{} + resp, err := client.Get(url) + + if err != nil { + log.Fatal(err) + } + + defer resp.Body.Close() + + segmenter := newMP3HTTPSegmenter() + segments, err := segmenter.Segment(resp.Body) + if err != nil { + log.Fatal(err) + } + + go func() { + for segment := range segments { + log.Println("got segment", segment) + } + }() + + time.Sleep(10 * time.Second) + + log.Println("exiting") +} -- 2.40.1 From 17ff26daf33c648d62688af4f51e6fe096e04895 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Mon, 6 Jul 2020 13:29:44 +0200 Subject: [PATCH 02/12] Basic working segmentation of MP3 audio stream --- go.mod | 8 +++++ go.sum | 13 ++++++++ main.go | 84 +++++++++++++++++++++++++++++++++++++++------------- main_test.go | 22 ++++++++++++++ 4 files changed, 107 insertions(+), 20 deletions(-) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main_test.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..2735aa5 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module segmento + +go 1.14 + +require ( + github.com/stretchr/testify v1.6.1 + github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..4126d87 --- /dev/null +++ b/go.sum @@ -0,0 +1,13 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300 h1:XQdibLKagjdevRB6vAjVY4qbSr8rQ610YzTkWcxzxSI= +github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300/go.mod h1:FNa/dfN95vAYCNFrIKRrlRo+MBLbwmR9Asa5f2ljmBI= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 94895b3..e7e34d0 100644 --- a/main.go +++ b/main.go @@ -1,8 +1,8 @@ package main import ( + "bytes" "flag" - "fmt" "io" "log" "net/http" @@ -12,33 +12,52 @@ import ( "github.com/tcolgate/mp3" ) -// TODO what should be the behaviour when adding a Frame to a Segment causes it to over-shoot -// the TargetDuration? -// should the frame be partially added, or not added at all? -// typical frame duration 36ms +const DefaultTargetDuration = 3 * time.Second + type Segment struct { - TargetDuration time.Duration - Duration time.Duration - Data []byte + targetDuration time.Duration + duration time.Duration + data *bytes.Buffer } -func newSegment(capacity int) *Segment { +// Initialize a new Segment struct. The capacity is the initial maximum capacity of the internal +// buffer, in bytes. It should be initialized with a value greater than the expected maximum buffer size, +// depending on the implementation. +func newSegment(targetDuration time.Duration, capacity int) *Segment { return &Segment{ - Data: make([]byte, 0, capacity), + data: bytes.NewBuffer(make([]byte, 0, capacity)), + duration: 0, + targetDuration: targetDuration, } } -func (s *Segment) Write(p []byte) (n int, err error) { - fmt.Println("write", len(p), "bytes") - return len(p), nil +func (s *Segment) ReadFrom(r io.Reader) (n int64, err error) { + return s.data.ReadFrom(r) +} + +func (s *Segment) IncrementDuration(d time.Duration) time.Duration { + s.duration += d + return s.duration +} + +func (s *Segment) CanWrite(d time.Duration) bool { + return s.targetDuration-s.duration >= d +} + +func (s *Segment) Duration() time.Duration { + return s.duration +} + +func (s *Segment) Len() int { + return s.data.Len() } type MP3HTTPSegmenter struct { decoder *mp3.Decoder } -func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan Segment, error) { - c := make(chan Segment) +func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan *Segment, error) { + c := make(chan *Segment) go func() { d := mp3.NewDecoder(r) @@ -48,12 +67,39 @@ func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan Segment, error) { skipped int ) + var ( + s *Segment + ) + for { if err := d.Decode(&v, &skipped); err != nil { + if err == io.EOF { + break + } log.Fatal(err) } - fmt.Println("decoded frame of", v.Size(), "bytes, duration", v.Duration(), ", skipped", skipped, "bytes") + if s != nil && !s.CanWrite(v.Duration()) { + // publish the current segment, and initialize a new one. + c <- s + s = nil + } + + if s == nil { + s = newSegment(DefaultTargetDuration, 1024) + } + + n, err := s.ReadFrom(v.Reader()) + if err != nil { + log.Fatal(err) // TODO: some proper error handling + } + + if n != int64(v.Size()) { + // TODO would this ever happen? + log.Fatal("unexpeced frame size, want = ", v.Size(), "got = ", n) + } + + s.IncrementDuration(v.Duration()) } }() @@ -91,8 +137,6 @@ func main() { os.Exit(-1) } - log.Println("Open URL:", url) - client := &http.Client{} resp, err := client.Get(url) @@ -109,8 +153,8 @@ func main() { } go func() { - for segment := range segments { - log.Println("got segment", segment) + for s := range segments { + log.Println("got segment with duration", s.Duration(), "and len", s.Len(), "bytes") } }() diff --git a/main_test.go b/main_test.go new file mode 100644 index 0000000..a7f30fd --- /dev/null +++ b/main_test.go @@ -0,0 +1,22 @@ +package main + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSegment(t *testing.T) { + segment := newSegment(10*time.Second, 0) + + require.Equal(t, time.Duration(0), segment.Duration()) + require.True(t, segment.CanWrite(9*time.Second)) + require.True(t, segment.CanWrite(10*time.Second)) + require.False(t, segment.CanWrite(11*time.Second)) + + d := segment.IncrementDuration(10 * time.Second) + require.Equal(t, segment.Duration(), d) + require.Equal(t, 10*time.Second, segment.Duration()) + require.False(t, segment.CanWrite(1*time.Millisecond)) +} -- 2.40.1 From 86d01d3eff3e77d50051811e970c3fe93abcf145 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Mon, 6 Jul 2020 16:17:08 +0200 Subject: [PATCH 03/12] Start to sketch out a Playlist interface/struct --- main.go | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index e7e34d0..ca4416d 100644 --- a/main.go +++ b/main.go @@ -86,6 +86,7 @@ func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan *Segment, error) { } if s == nil { + // TODO what is a good initial capacity? s = newSegment(DefaultTargetDuration, 1024) } @@ -110,13 +111,38 @@ func newMP3HTTPSegmenter() *MP3HTTPSegmenter { return &MP3HTTPSegmenter{} } -// TODO +type Playlist interface { + // These could be moved to an interface? + Duration() time.Duration + TargetDuration() time.Duration +} -// A Segmenter class which allows the passing in of a Reader -// i.e. -// Segment(r io.Reader) (chan *Segment, error) -// This will create an mp3.NewDecoder(r) and store the returned decoder. -// As the Reader is read, the Segmenter will publish a stream of SegmentEvents +type MediaPlaylist struct { + Segments []*Segment +} + +func newMediaPlaylist() *MediaPlaylist { + return &MediaPlaylist{ + Segments: make([]*Segment, 0, 10), + } +} + +func (p *MediaPlaylist) Duration() time.Duration { + var t time.Duration + for _, s := range p.Segments { + t += s.Duration() + } + return t +} + +func (p *MediaPlaylist) Run() error { + for { + // TODO block here and listen to the channel of incoming segments. + // As the reader is Read and segments are produced, update the Playlist + // struct and possibly notify consumers. + // What would actually be a useful API and/or Go best practices? + } +} func main() { // TODO accept some flags with: -- 2.40.1 From 81b09338cce3d367ba99cde2998002fe0d9bf9ca Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Mon, 6 Jul 2020 23:17:01 +0200 Subject: [PATCH 04/12] Extend playlist functionality --- main.go | 36 ++++++++++++++++++++++++++++++++++-- main_test.go | 15 +++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index ca4416d..3ae5dde 100644 --- a/main.go +++ b/main.go @@ -13,6 +13,7 @@ import ( ) const DefaultTargetDuration = 3 * time.Second +const DefaultPlaylistDuration = 20 * time.Second type Segment struct { targetDuration time.Duration @@ -96,7 +97,7 @@ func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan *Segment, error) { } if n != int64(v.Size()) { - // TODO would this ever happen? + // TODO would this ever happen? What should IncrementDuration be called with? log.Fatal("unexpeced frame size, want = ", v.Size(), "got = ", n) } @@ -115,6 +116,7 @@ type Playlist interface { // These could be moved to an interface? Duration() time.Duration TargetDuration() time.Duration + AddSegment(s *Segment) error } type MediaPlaylist struct { @@ -128,8 +130,33 @@ func newMediaPlaylist() *MediaPlaylist { } func (p *MediaPlaylist) Duration() time.Duration { + return p.durationOf(p.Segments) +} + +func (p *MediaPlaylist) TargetDuration() time.Duration { + return DefaultPlaylistDuration +} + +func (p *MediaPlaylist) AddSegment(s *Segment) error { + p.Segments = append(p.Segments, s) + + if len(p.Segments) == 1 { + return nil + } + + for { + if p.durationOf(p.Segments[1:]) > p.TargetDuration() { + p.Segments = p.Segments[1:] + } + break + } + + return nil +} + +func (p *MediaPlaylist) durationOf(ss []*Segment) time.Duration { var t time.Duration - for _, s := range p.Segments { + for _, s := range ss { t += s.Duration() } return t @@ -144,6 +171,11 @@ func (p *MediaPlaylist) Run() error { } } +type PlaylistListener interface { + SegmentAdded(s *Segment) error + SegmentRemoved(s *Segment) error +} + func main() { // TODO accept some flags with: // URL - source of stream diff --git a/main_test.go b/main_test.go index a7f30fd..ff0fb2a 100644 --- a/main_test.go +++ b/main_test.go @@ -20,3 +20,18 @@ func TestSegment(t *testing.T) { require.Equal(t, 10*time.Second, segment.Duration()) require.False(t, segment.CanWrite(1*time.Millisecond)) } + +func TestMediaPlaylistImplements(t *testing.T) { + require.Implements(t, (*Playlist)(nil), newMediaPlaylist()) +} + +func TestMediaPlaylist(t *testing.T) { + playlist := newMediaPlaylist() + + for i := 0; i < 8; i++ { + s := newSegment(10*time.Second, 0) + s.IncrementDuration(3 * time.Second) + playlist.AddSegment(s) + } + require.Equal(t, 21*time.Second, playlist.Duration()) +} -- 2.40.1 From e836b44941ee8f827e9831b77c99b2158ec35db1 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Tue, 7 Jul 2020 16:35:20 +0200 Subject: [PATCH 05/12] Refactor into packages --- internal/playlist/playlist.go | 72 ++++++++ internal/playlist/playlist_test.go | 25 +++ internal/segment/segment.go | 109 +++++++++++ .../segment/segment_test.go | 21 +-- main.go | 170 +----------------- 5 files changed, 212 insertions(+), 185 deletions(-) create mode 100644 internal/playlist/playlist.go create mode 100644 internal/playlist/playlist_test.go create mode 100644 internal/segment/segment.go rename main_test.go => internal/segment/segment_test.go (54%) diff --git a/internal/playlist/playlist.go b/internal/playlist/playlist.go new file mode 100644 index 0000000..2062d2a --- /dev/null +++ b/internal/playlist/playlist.go @@ -0,0 +1,72 @@ +package playlist + +import ( + "segmento/internal/segment" + "time" +) + +const DefaultPlaylistDuration = 20 * time.Second + +type Playlist interface { + // These could be moved to an interface? + Duration() time.Duration + TargetDuration() time.Duration + AddSegment(s *segment.Segment) error +} + +type MediaPlaylist struct { + Segments []*segment.Segment +} + +func NewMediaPlaylist() *MediaPlaylist { + return &MediaPlaylist{ + Segments: make([]*segment.Segment, 0, 10), + } +} + +func (p *MediaPlaylist) Duration() time.Duration { + return p.durationOf(p.Segments) +} + +func (p *MediaPlaylist) TargetDuration() time.Duration { + return DefaultPlaylistDuration +} + +func (p *MediaPlaylist) AddSegment(s *segment.Segment) error { + p.Segments = append(p.Segments, s) + + if len(p.Segments) == 1 { + return nil + } + + for { + if p.durationOf(p.Segments[1:]) > p.TargetDuration() { + p.Segments = p.Segments[1:] + } + break + } + + return nil +} + +func (p *MediaPlaylist) durationOf(ss []*segment.Segment) time.Duration { + var t time.Duration + for _, s := range ss { + t += s.Duration() + } + return t +} + +func (p *MediaPlaylist) Run() error { + for { + // TODO block here and listen to the channel of incoming segments. + // As the reader is Read and segments are produced, update the Playlist + // struct and possibly notify consumers. + // What would actually be a useful API and/or Go best practices? + } +} + +type PlaylistListener interface { + SegmentAdded(s *segment.Segment) error + SegmentRemoved(s *segment.Segment) error +} diff --git a/internal/playlist/playlist_test.go b/internal/playlist/playlist_test.go new file mode 100644 index 0000000..06c62bc --- /dev/null +++ b/internal/playlist/playlist_test.go @@ -0,0 +1,25 @@ +package playlist_test + +import ( + "segmento/internal/playlist" + "segmento/internal/segment" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestMediaPlaylistImplements(t *testing.T) { + require.Implements(t, (*playlist.Playlist)(nil), playlist.NewMediaPlaylist()) +} + +func TestMediaPlaylist(t *testing.T) { + playlist := playlist.NewMediaPlaylist() + + for i := 0; i < 8; i++ { + s := segment.NewSegment(10*time.Second, 0) + s.IncrementDuration(3 * time.Second) + playlist.AddSegment(s) + } + require.Equal(t, 21*time.Second, playlist.Duration()) +} diff --git a/internal/segment/segment.go b/internal/segment/segment.go new file mode 100644 index 0000000..c9578b9 --- /dev/null +++ b/internal/segment/segment.go @@ -0,0 +1,109 @@ +package segment + +import ( + "bytes" + "io" + "log" + "time" + + "github.com/tcolgate/mp3" +) + +const DefaultTargetDuration = 3 * time.Second + +type Segment struct { + targetDuration time.Duration + duration time.Duration + data *bytes.Buffer +} + +// Initialize a new Segment struct. The capacity is the initial maximum capacity of the internal +// buffer, in bytes. It should be initialized with a value greater than the expected maximum buffer size, +// depending on the implementation. +func NewSegment(targetDuration time.Duration, capacity int) *Segment { + return &Segment{ + data: bytes.NewBuffer(make([]byte, 0, capacity)), + duration: 0, + targetDuration: targetDuration, + } +} + +func (s *Segment) ReadFrom(r io.Reader) (n int64, err error) { + return s.data.ReadFrom(r) +} + +func (s *Segment) IncrementDuration(d time.Duration) time.Duration { + s.duration += d + return s.duration +} + +func (s *Segment) CanWrite(d time.Duration) bool { + return s.targetDuration-s.duration >= d +} + +func (s *Segment) Duration() time.Duration { + return s.duration +} + +func (s *Segment) Len() int { + return s.data.Len() +} + +type MP3HTTPSegmenter struct { + decoder *mp3.Decoder +} + +func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan *Segment, error) { + c := make(chan *Segment) + + go func() { + d := mp3.NewDecoder(r) + + var ( + v mp3.Frame + skipped int + ) + + var ( + s *Segment + ) + + for { + if err := d.Decode(&v, &skipped); err != nil { + if err == io.EOF { + break + } + log.Fatal(err) + } + + if s != nil && !s.CanWrite(v.Duration()) { + // publish the current segment, and initialize a new one. + c <- s + s = nil + } + + if s == nil { + // TODO what is a good initial capacity? + s = NewSegment(DefaultTargetDuration, 1024) + } + + n, err := s.ReadFrom(v.Reader()) + if err != nil { + log.Fatal(err) // TODO: some proper error handling + } + + if n != int64(v.Size()) { + // TODO would this ever happen? What should IncrementDuration be called with? + log.Fatal("unexpeced frame size, want = ", v.Size(), "got = ", n) + } + + s.IncrementDuration(v.Duration()) + } + }() + + return c, nil +} + +func NewMP3HTTPSegmenter() *MP3HTTPSegmenter { + return &MP3HTTPSegmenter{} +} diff --git a/main_test.go b/internal/segment/segment_test.go similarity index 54% rename from main_test.go rename to internal/segment/segment_test.go index ff0fb2a..7ae1ee5 100644 --- a/main_test.go +++ b/internal/segment/segment_test.go @@ -1,14 +1,16 @@ -package main +package segment_test import ( "testing" "time" + "segmento/internal/segment" + "github.com/stretchr/testify/require" ) func TestSegment(t *testing.T) { - segment := newSegment(10*time.Second, 0) + segment := segment.NewSegment(10*time.Second, 0) require.Equal(t, time.Duration(0), segment.Duration()) require.True(t, segment.CanWrite(9*time.Second)) @@ -20,18 +22,3 @@ func TestSegment(t *testing.T) { require.Equal(t, 10*time.Second, segment.Duration()) require.False(t, segment.CanWrite(1*time.Millisecond)) } - -func TestMediaPlaylistImplements(t *testing.T) { - require.Implements(t, (*Playlist)(nil), newMediaPlaylist()) -} - -func TestMediaPlaylist(t *testing.T) { - playlist := newMediaPlaylist() - - for i := 0; i < 8; i++ { - s := newSegment(10*time.Second, 0) - s.IncrementDuration(3 * time.Second) - playlist.AddSegment(s) - } - require.Equal(t, 21*time.Second, playlist.Duration()) -} diff --git a/main.go b/main.go index 3ae5dde..9a5cc21 100644 --- a/main.go +++ b/main.go @@ -1,181 +1,15 @@ package main import ( - "bytes" "flag" - "io" "log" "net/http" "os" "time" - "github.com/tcolgate/mp3" + "segmento/internal/segment" ) -const DefaultTargetDuration = 3 * time.Second -const DefaultPlaylistDuration = 20 * time.Second - -type Segment struct { - targetDuration time.Duration - duration time.Duration - data *bytes.Buffer -} - -// Initialize a new Segment struct. The capacity is the initial maximum capacity of the internal -// buffer, in bytes. It should be initialized with a value greater than the expected maximum buffer size, -// depending on the implementation. -func newSegment(targetDuration time.Duration, capacity int) *Segment { - return &Segment{ - data: bytes.NewBuffer(make([]byte, 0, capacity)), - duration: 0, - targetDuration: targetDuration, - } -} - -func (s *Segment) ReadFrom(r io.Reader) (n int64, err error) { - return s.data.ReadFrom(r) -} - -func (s *Segment) IncrementDuration(d time.Duration) time.Duration { - s.duration += d - return s.duration -} - -func (s *Segment) CanWrite(d time.Duration) bool { - return s.targetDuration-s.duration >= d -} - -func (s *Segment) Duration() time.Duration { - return s.duration -} - -func (s *Segment) Len() int { - return s.data.Len() -} - -type MP3HTTPSegmenter struct { - decoder *mp3.Decoder -} - -func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan *Segment, error) { - c := make(chan *Segment) - - go func() { - d := mp3.NewDecoder(r) - - var ( - v mp3.Frame - skipped int - ) - - var ( - s *Segment - ) - - for { - if err := d.Decode(&v, &skipped); err != nil { - if err == io.EOF { - break - } - log.Fatal(err) - } - - if s != nil && !s.CanWrite(v.Duration()) { - // publish the current segment, and initialize a new one. - c <- s - s = nil - } - - if s == nil { - // TODO what is a good initial capacity? - s = newSegment(DefaultTargetDuration, 1024) - } - - n, err := s.ReadFrom(v.Reader()) - if err != nil { - log.Fatal(err) // TODO: some proper error handling - } - - if n != int64(v.Size()) { - // TODO would this ever happen? What should IncrementDuration be called with? - log.Fatal("unexpeced frame size, want = ", v.Size(), "got = ", n) - } - - s.IncrementDuration(v.Duration()) - } - }() - - return c, nil -} - -func newMP3HTTPSegmenter() *MP3HTTPSegmenter { - return &MP3HTTPSegmenter{} -} - -type Playlist interface { - // These could be moved to an interface? - Duration() time.Duration - TargetDuration() time.Duration - AddSegment(s *Segment) error -} - -type MediaPlaylist struct { - Segments []*Segment -} - -func newMediaPlaylist() *MediaPlaylist { - return &MediaPlaylist{ - Segments: make([]*Segment, 0, 10), - } -} - -func (p *MediaPlaylist) Duration() time.Duration { - return p.durationOf(p.Segments) -} - -func (p *MediaPlaylist) TargetDuration() time.Duration { - return DefaultPlaylistDuration -} - -func (p *MediaPlaylist) AddSegment(s *Segment) error { - p.Segments = append(p.Segments, s) - - if len(p.Segments) == 1 { - return nil - } - - for { - if p.durationOf(p.Segments[1:]) > p.TargetDuration() { - p.Segments = p.Segments[1:] - } - break - } - - return nil -} - -func (p *MediaPlaylist) durationOf(ss []*Segment) time.Duration { - var t time.Duration - for _, s := range ss { - t += s.Duration() - } - return t -} - -func (p *MediaPlaylist) Run() error { - for { - // TODO block here and listen to the channel of incoming segments. - // As the reader is Read and segments are produced, update the Playlist - // struct and possibly notify consumers. - // What would actually be a useful API and/or Go best practices? - } -} - -type PlaylistListener interface { - SegmentAdded(s *Segment) error - SegmentRemoved(s *Segment) error -} - func main() { // TODO accept some flags with: // URL - source of stream @@ -204,7 +38,7 @@ func main() { defer resp.Body.Close() - segmenter := newMP3HTTPSegmenter() + segmenter := segment.NewMP3HTTPSegmenter() segments, err := segmenter.Segment(resp.Body) if err != nil { log.Fatal(err) -- 2.40.1 From afbe654de08ea1422d4870d07d686b0a848ed78e Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Tue, 7 Jul 2020 17:19:17 +0200 Subject: [PATCH 06/12] Implement basic Playlist rendering --- internal/playlist/playlist.go | 25 ++++++++++++++++++++----- internal/playlist/playlist_test.go | 25 ++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/internal/playlist/playlist.go b/internal/playlist/playlist.go index 2062d2a..5997952 100644 --- a/internal/playlist/playlist.go +++ b/internal/playlist/playlist.go @@ -1,17 +1,24 @@ package playlist import ( + "fmt" "segmento/internal/segment" "time" ) const DefaultPlaylistDuration = 20 * time.Second +type PlaylistListener interface { + OnUpdate(p *Playlist) +} + type Playlist interface { // These could be moved to an interface? Duration() time.Duration TargetDuration() time.Duration AddSegment(s *segment.Segment) error + Render() string + //AddListener(l PlaylistListener) } type MediaPlaylist struct { @@ -57,6 +64,19 @@ func (p *MediaPlaylist) durationOf(ss []*segment.Segment) time.Duration { return t } +func (p *MediaPlaylist) Render() string { + var r string + r += "#EXTM3U\n" + r += "#EXT-X-VERSION:3\n" + r += "#EXT-X-TARGETDURATION:3\n" // TODO + for _, s := range p.Segments { + r += fmt.Sprintf("#EXTINF:%.05f\n", float32(s.Duration())/float32(time.Second)) + r += "http://www.example.com/x.mp3\n" + } + r += "#EXT-X-ENDLIST" + return r +} + func (p *MediaPlaylist) Run() error { for { // TODO block here and listen to the channel of incoming segments. @@ -65,8 +85,3 @@ func (p *MediaPlaylist) Run() error { // What would actually be a useful API and/or Go best practices? } } - -type PlaylistListener interface { - SegmentAdded(s *segment.Segment) error - SegmentRemoved(s *segment.Segment) error -} diff --git a/internal/playlist/playlist_test.go b/internal/playlist/playlist_test.go index 06c62bc..fe5092e 100644 --- a/internal/playlist/playlist_test.go +++ b/internal/playlist/playlist_test.go @@ -3,6 +3,7 @@ package playlist_test import ( "segmento/internal/playlist" "segmento/internal/segment" + "strings" "testing" "time" @@ -17,9 +18,31 @@ func TestMediaPlaylist(t *testing.T) { playlist := playlist.NewMediaPlaylist() for i := 0; i < 8; i++ { - s := segment.NewSegment(10*time.Second, 0) + s := segment.NewSegment(3*time.Second, 0) s.IncrementDuration(3 * time.Second) playlist.AddSegment(s) } require.Equal(t, 21*time.Second, playlist.Duration()) } + +func TestMediaPlaylistRender(t *testing.T) { + playlist := playlist.NewMediaPlaylist() + + s := segment.NewSegment(3*time.Second, 0) + s.IncrementDuration(3 * time.Second) + playlist.AddSegment(s) + + s = segment.NewSegment(3*time.Second, 0) + s.IncrementDuration(2700 * time.Millisecond) + playlist.AddSegment(s) + + lines := strings.Split(playlist.Render(), "\n") + + require.Equal(t, "#EXTM3U", lines[0]) + require.Equal(t, "#EXT-X-VERSION:3", lines[1]) + require.Equal(t, "#EXT-X-TARGETDURATION:3", lines[2]) + require.Equal(t, "#EXTINF:3.00000", lines[3]) + require.Equal(t, "http://www.example.com/x.mp3", lines[4]) + require.Equal(t, "#EXTINF:2.70000", lines[5]) + require.Equal(t, "http://www.example.com/x.mp3", lines[6]) +} -- 2.40.1 From 5727cd3c12219ccca078adab7b823d19be4dad06 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Tue, 7 Jul 2020 17:54:39 +0200 Subject: [PATCH 07/12] Add basic Playlist listener functionality --- internal/playlist/playlist.go | 38 ++++++++++++++++++++---------- internal/playlist/playlist_test.go | 20 ++++++++++++++++ 2 files changed, 46 insertions(+), 12 deletions(-) diff --git a/internal/playlist/playlist.go b/internal/playlist/playlist.go index 5997952..4e426c4 100644 --- a/internal/playlist/playlist.go +++ b/internal/playlist/playlist.go @@ -9,7 +9,7 @@ import ( const DefaultPlaylistDuration = 20 * time.Second type PlaylistListener interface { - OnUpdate(p *Playlist) + OnUpdate(p Playlist) } type Playlist interface { @@ -18,16 +18,18 @@ type Playlist interface { TargetDuration() time.Duration AddSegment(s *segment.Segment) error Render() string - //AddListener(l PlaylistListener) + AddListener(l PlaylistListener) } type MediaPlaylist struct { - Segments []*segment.Segment + Segments []*segment.Segment + Listeners []PlaylistListener } func NewMediaPlaylist() *MediaPlaylist { return &MediaPlaylist{ - Segments: make([]*segment.Segment, 0, 10), + Segments: make([]*segment.Segment, 0, 10), + Listeners: make([]PlaylistListener, 0), } } @@ -42,16 +44,16 @@ func (p *MediaPlaylist) TargetDuration() time.Duration { func (p *MediaPlaylist) AddSegment(s *segment.Segment) error { p.Segments = append(p.Segments, s) - if len(p.Segments) == 1 { - return nil + if len(p.Segments) > 1 { + for { + if p.durationOf(p.Segments[1:]) > p.TargetDuration() { + p.Segments = p.Segments[1:] + } + break + } } - for { - if p.durationOf(p.Segments[1:]) > p.TargetDuration() { - p.Segments = p.Segments[1:] - } - break - } + p.updateListeners() return nil } @@ -77,6 +79,18 @@ func (p *MediaPlaylist) Render() string { return r } +// Listeners + +func (p *MediaPlaylist) AddListener(l PlaylistListener) { + p.Listeners = append(p.Listeners, l) +} + +func (p *MediaPlaylist) updateListeners() { + for _, l := range p.Listeners { + l.OnUpdate(p) + } +} + func (p *MediaPlaylist) Run() error { for { // TODO block here and listen to the channel of incoming segments. diff --git a/internal/playlist/playlist_test.go b/internal/playlist/playlist_test.go index fe5092e..c4f46dd 100644 --- a/internal/playlist/playlist_test.go +++ b/internal/playlist/playlist_test.go @@ -46,3 +46,23 @@ func TestMediaPlaylistRender(t *testing.T) { require.Equal(t, "#EXTINF:2.70000", lines[5]) require.Equal(t, "http://www.example.com/x.mp3", lines[6]) } + +type listener struct { + count int +} + +func (l *listener) OnUpdate(p playlist.Playlist) { + l.count++ +} + +func TestMediaPlaylistListener(t *testing.T) { + playlist := playlist.NewMediaPlaylist() + l := &listener{} + playlist.AddListener(l) + + s := segment.NewSegment(3*time.Second, 0) + s.IncrementDuration(3 * time.Second) + playlist.AddSegment(s) + + require.Equal(t, 1, l.count) +} -- 2.40.1 From 9a50d96f0a43cee91f32b02b4619d11934fdfe8a Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Tue, 7 Jul 2020 23:28:50 +0200 Subject: [PATCH 08/12] Implement basic playlist rendering --- go.mod | 3 +++ go.sum | 25 +++++++++++++++++++ internal/playlist/playlist.go | 47 ++++++++++++++++++++++++----------- internal/s3/s3.go | 21 ++++++++++++++++ internal/s3/s3_test.go | 13 ++++++++++ main.go | 28 +++++++++++++++------ 6 files changed, 115 insertions(+), 22 deletions(-) create mode 100644 internal/s3/s3.go create mode 100644 internal/s3/s3_test.go diff --git a/go.mod b/go.mod index 2735aa5..e906851 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,9 @@ module segmento go 1.14 require ( + github.com/aws/aws-sdk-go v1.33.3 // indirect github.com/stretchr/testify v1.6.1 github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300 + golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect + golang.org/x/text v0.3.3 // indirect ) diff --git a/go.sum b/go.sum index 4126d87..0c432bb 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,38 @@ +github.com/aws/aws-sdk-go v1.33.3 h1:wjhURjD/xuBBxdCan0F5yuW7qzkSlYY4/RdYGlyab9s= +github.com/aws/aws-sdk-go v1.33.3/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= +github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300 h1:XQdibLKagjdevRB6vAjVY4qbSr8rQ610YzTkWcxzxSI= github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300/go.mod h1:FNa/dfN95vAYCNFrIKRrlRo+MBLbwmR9Asa5f2ljmBI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= +golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/playlist/playlist.go b/internal/playlist/playlist.go index 4e426c4..af94ae4 100644 --- a/internal/playlist/playlist.go +++ b/internal/playlist/playlist.go @@ -2,37 +2,51 @@ package playlist import ( "fmt" - "segmento/internal/segment" + "log" "time" + + segmentpkg "segmento/internal/segment" ) const DefaultPlaylistDuration = 20 * time.Second +type segment struct { + segmentpkg.Segment + seqId int +} + type PlaylistListener interface { OnUpdate(p Playlist) } type Playlist interface { - // These could be moved to an interface? Duration() time.Duration TargetDuration() time.Duration - AddSegment(s *segment.Segment) error Render() string + ReadSegments(chan *segmentpkg.Segment) error AddListener(l PlaylistListener) } type MediaPlaylist struct { - Segments []*segment.Segment + Segments []*segment Listeners []PlaylistListener + + seqId int } func NewMediaPlaylist() *MediaPlaylist { return &MediaPlaylist{ - Segments: make([]*segment.Segment, 0, 10), + Segments: make([]*segment, 0, 10), Listeners: make([]PlaylistListener, 0), } } +func (p *MediaPlaylist) nextSeqId() int { + next := p.seqId + p.seqId++ + return next +} + func (p *MediaPlaylist) Duration() time.Duration { return p.durationOf(p.Segments) } @@ -41,8 +55,13 @@ func (p *MediaPlaylist) TargetDuration() time.Duration { return DefaultPlaylistDuration } -func (p *MediaPlaylist) AddSegment(s *segment.Segment) error { - p.Segments = append(p.Segments, s) +// pass by pointer or value here? +// I think value is better, the struct is tiny and only contains a pointer +// to the data buffer +func (p *MediaPlaylist) AddSegment(s *segmentpkg.Segment) error { + nextId := p.nextSeqId() + ss := segment{*s, nextId} + p.Segments = append(p.Segments, &ss) if len(p.Segments) > 1 { for { @@ -58,7 +77,7 @@ func (p *MediaPlaylist) AddSegment(s *segment.Segment) error { return nil } -func (p *MediaPlaylist) durationOf(ss []*segment.Segment) time.Duration { +func (p *MediaPlaylist) durationOf(ss []*segment) time.Duration { var t time.Duration for _, s := range ss { t += s.Duration() @@ -91,11 +110,11 @@ func (p *MediaPlaylist) updateListeners() { } } -func (p *MediaPlaylist) Run() error { - for { - // TODO block here and listen to the channel of incoming segments. - // As the reader is Read and segments are produced, update the Playlist - // struct and possibly notify consumers. - // What would actually be a useful API and/or Go best practices? +func (p *MediaPlaylist) ReadSegments(segments chan *segmentpkg.Segment) error { + for s := range segments { + log.Println("got segment with duration", s.Duration(), "and len", s.Len(), "bytes") + p.AddSegment(s) } + log.Println("exiting ReadSegments") + return nil } diff --git a/internal/s3/s3.go b/internal/s3/s3.go new file mode 100644 index 0000000..2456007 --- /dev/null +++ b/internal/s3/s3.go @@ -0,0 +1,21 @@ +package s3 + +import "segmento/internal/playlist" + +//import ( +//"github.com/aws/aws-sdk-go/aws" +//"github.com/aws/aws-sdk-go/aws/session" +//"github.com/aws/aws-sdk-go/service/s3" +//) + +type S3PlaylistUploader struct { + AWSAccessKeyID string + AWSSecretAccessKey string + AWSRegion string + BucketName string + PathPrefix string +} + +func (u *S3PlaylistUploader) OnUpdate(p playlist.Playlist) { + +} diff --git a/internal/s3/s3_test.go b/internal/s3/s3_test.go new file mode 100644 index 0000000..570342f --- /dev/null +++ b/internal/s3/s3_test.go @@ -0,0 +1,13 @@ +package s3_test + +import ( + "segmento/internal/playlist" + "segmento/internal/s3" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestS3PlaylistUploaderImplements(t *testing.T) { + require.Implements(t, (*playlist.PlaylistListener)(nil), &s3.S3PlaylistUploader{}) +} diff --git a/main.go b/main.go index 9a5cc21..6520c1e 100644 --- a/main.go +++ b/main.go @@ -2,14 +2,25 @@ package main import ( "flag" + "fmt" "log" "net/http" "os" - "time" + "segmento/internal/playlist" "segmento/internal/segment" ) +type listener struct { +} + +func (l *listener) OnUpdate(p playlist.Playlist) { + fmt.Println("Playlist updated") + fmt.Println("Playlist:") + fmt.Println(p.Render()) + fmt.Println("") +} + func main() { // TODO accept some flags with: // URL - source of stream @@ -38,19 +49,20 @@ func main() { defer resp.Body.Close() + listener := listener{} + playlist := playlist.NewMediaPlaylist() + playlist.AddListener(&listener) + segmenter := segment.NewMP3HTTPSegmenter() segments, err := segmenter.Segment(resp.Body) if err != nil { log.Fatal(err) } - go func() { - for s := range segments { - log.Println("got segment with duration", s.Duration(), "and len", s.Len(), "bytes") - } - }() - - time.Sleep(10 * time.Second) + // Block while reading the segments into the playlist: + if err = playlist.ReadSegments(segments); err != nil { + log.Fatal(err) + } log.Println("exiting") } -- 2.40.1 From eedb2cd7e800274e4edc34494494a4e6a6080f75 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Wed, 8 Jul 2020 19:31:57 +0200 Subject: [PATCH 09/12] Refactor: transfer ownership of most things to Playlist --- internal/playlist/playlist.go | 120 ----------------- internal/playlist/playlist_test.go | 68 ---------- internal/s3/s3.go | 21 --- internal/s3/s3_test.go | 13 -- main.go | 34 ++--- .../segment/segment.go => pkg/media/media.go | 30 ++++- .../media/media_test.go | 11 +- pkg/playlist2/playlist.go | 125 ++++++++++++++++++ pkg/playlist2/playlist_test.go | 102 ++++++++++++++ 9 files changed, 271 insertions(+), 253 deletions(-) delete mode 100644 internal/playlist/playlist.go delete mode 100644 internal/playlist/playlist_test.go delete mode 100644 internal/s3/s3.go delete mode 100644 internal/s3/s3_test.go rename internal/segment/segment.go => pkg/media/media.go (76%) rename internal/segment/segment_test.go => pkg/media/media_test.go (71%) create mode 100644 pkg/playlist2/playlist.go create mode 100644 pkg/playlist2/playlist_test.go diff --git a/internal/playlist/playlist.go b/internal/playlist/playlist.go deleted file mode 100644 index af94ae4..0000000 --- a/internal/playlist/playlist.go +++ /dev/null @@ -1,120 +0,0 @@ -package playlist - -import ( - "fmt" - "log" - "time" - - segmentpkg "segmento/internal/segment" -) - -const DefaultPlaylistDuration = 20 * time.Second - -type segment struct { - segmentpkg.Segment - seqId int -} - -type PlaylistListener interface { - OnUpdate(p Playlist) -} - -type Playlist interface { - Duration() time.Duration - TargetDuration() time.Duration - Render() string - ReadSegments(chan *segmentpkg.Segment) error - AddListener(l PlaylistListener) -} - -type MediaPlaylist struct { - Segments []*segment - Listeners []PlaylistListener - - seqId int -} - -func NewMediaPlaylist() *MediaPlaylist { - return &MediaPlaylist{ - Segments: make([]*segment, 0, 10), - Listeners: make([]PlaylistListener, 0), - } -} - -func (p *MediaPlaylist) nextSeqId() int { - next := p.seqId - p.seqId++ - return next -} - -func (p *MediaPlaylist) Duration() time.Duration { - return p.durationOf(p.Segments) -} - -func (p *MediaPlaylist) TargetDuration() time.Duration { - return DefaultPlaylistDuration -} - -// pass by pointer or value here? -// I think value is better, the struct is tiny and only contains a pointer -// to the data buffer -func (p *MediaPlaylist) AddSegment(s *segmentpkg.Segment) error { - nextId := p.nextSeqId() - ss := segment{*s, nextId} - p.Segments = append(p.Segments, &ss) - - if len(p.Segments) > 1 { - for { - if p.durationOf(p.Segments[1:]) > p.TargetDuration() { - p.Segments = p.Segments[1:] - } - break - } - } - - p.updateListeners() - - return nil -} - -func (p *MediaPlaylist) durationOf(ss []*segment) time.Duration { - var t time.Duration - for _, s := range ss { - t += s.Duration() - } - return t -} - -func (p *MediaPlaylist) Render() string { - var r string - r += "#EXTM3U\n" - r += "#EXT-X-VERSION:3\n" - r += "#EXT-X-TARGETDURATION:3\n" // TODO - for _, s := range p.Segments { - r += fmt.Sprintf("#EXTINF:%.05f\n", float32(s.Duration())/float32(time.Second)) - r += "http://www.example.com/x.mp3\n" - } - r += "#EXT-X-ENDLIST" - return r -} - -// Listeners - -func (p *MediaPlaylist) AddListener(l PlaylistListener) { - p.Listeners = append(p.Listeners, l) -} - -func (p *MediaPlaylist) updateListeners() { - for _, l := range p.Listeners { - l.OnUpdate(p) - } -} - -func (p *MediaPlaylist) ReadSegments(segments chan *segmentpkg.Segment) error { - for s := range segments { - log.Println("got segment with duration", s.Duration(), "and len", s.Len(), "bytes") - p.AddSegment(s) - } - log.Println("exiting ReadSegments") - return nil -} diff --git a/internal/playlist/playlist_test.go b/internal/playlist/playlist_test.go deleted file mode 100644 index c4f46dd..0000000 --- a/internal/playlist/playlist_test.go +++ /dev/null @@ -1,68 +0,0 @@ -package playlist_test - -import ( - "segmento/internal/playlist" - "segmento/internal/segment" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestMediaPlaylistImplements(t *testing.T) { - require.Implements(t, (*playlist.Playlist)(nil), playlist.NewMediaPlaylist()) -} - -func TestMediaPlaylist(t *testing.T) { - playlist := playlist.NewMediaPlaylist() - - for i := 0; i < 8; i++ { - s := segment.NewSegment(3*time.Second, 0) - s.IncrementDuration(3 * time.Second) - playlist.AddSegment(s) - } - require.Equal(t, 21*time.Second, playlist.Duration()) -} - -func TestMediaPlaylistRender(t *testing.T) { - playlist := playlist.NewMediaPlaylist() - - s := segment.NewSegment(3*time.Second, 0) - s.IncrementDuration(3 * time.Second) - playlist.AddSegment(s) - - s = segment.NewSegment(3*time.Second, 0) - s.IncrementDuration(2700 * time.Millisecond) - playlist.AddSegment(s) - - lines := strings.Split(playlist.Render(), "\n") - - require.Equal(t, "#EXTM3U", lines[0]) - require.Equal(t, "#EXT-X-VERSION:3", lines[1]) - require.Equal(t, "#EXT-X-TARGETDURATION:3", lines[2]) - require.Equal(t, "#EXTINF:3.00000", lines[3]) - require.Equal(t, "http://www.example.com/x.mp3", lines[4]) - require.Equal(t, "#EXTINF:2.70000", lines[5]) - require.Equal(t, "http://www.example.com/x.mp3", lines[6]) -} - -type listener struct { - count int -} - -func (l *listener) OnUpdate(p playlist.Playlist) { - l.count++ -} - -func TestMediaPlaylistListener(t *testing.T) { - playlist := playlist.NewMediaPlaylist() - l := &listener{} - playlist.AddListener(l) - - s := segment.NewSegment(3*time.Second, 0) - s.IncrementDuration(3 * time.Second) - playlist.AddSegment(s) - - require.Equal(t, 1, l.count) -} diff --git a/internal/s3/s3.go b/internal/s3/s3.go deleted file mode 100644 index 2456007..0000000 --- a/internal/s3/s3.go +++ /dev/null @@ -1,21 +0,0 @@ -package s3 - -import "segmento/internal/playlist" - -//import ( -//"github.com/aws/aws-sdk-go/aws" -//"github.com/aws/aws-sdk-go/aws/session" -//"github.com/aws/aws-sdk-go/service/s3" -//) - -type S3PlaylistUploader struct { - AWSAccessKeyID string - AWSSecretAccessKey string - AWSRegion string - BucketName string - PathPrefix string -} - -func (u *S3PlaylistUploader) OnUpdate(p playlist.Playlist) { - -} diff --git a/internal/s3/s3_test.go b/internal/s3/s3_test.go deleted file mode 100644 index 570342f..0000000 --- a/internal/s3/s3_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package s3_test - -import ( - "segmento/internal/playlist" - "segmento/internal/s3" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestS3PlaylistUploaderImplements(t *testing.T) { - require.Implements(t, (*playlist.PlaylistListener)(nil), &s3.S3PlaylistUploader{}) -} diff --git a/main.go b/main.go index 6520c1e..bf358c8 100644 --- a/main.go +++ b/main.go @@ -6,19 +6,18 @@ import ( "log" "net/http" "os" - - "segmento/internal/playlist" - "segmento/internal/segment" + "segmento/pkg/media" + "segmento/pkg/playlist2" ) -type listener struct { +type consumer struct{} + +func (c *consumer) PlaylistSegmentAdded(p playlist2.Playlist, s *playlist2.PlaylistSegment) { + fmt.Println("in PlaylistSegmentAdded") } -func (l *listener) OnUpdate(p playlist.Playlist) { - fmt.Println("Playlist updated") - fmt.Println("Playlist:") - fmt.Println(p.Render()) - fmt.Println("") +func (c *consumer) PlaylistUpdated(p playlist2.Playlist) { + fmt.Println("in PlaylistUpdated") } func main() { @@ -49,18 +48,11 @@ func main() { defer resp.Body.Close() - listener := listener{} - playlist := playlist.NewMediaPlaylist() - playlist.AddListener(&listener) - - segmenter := segment.NewMP3HTTPSegmenter() - segments, err := segmenter.Segment(resp.Body) - if err != nil { - log.Fatal(err) - } - - // Block while reading the segments into the playlist: - if err = playlist.ReadSegments(segments); err != nil { + segmenter := media.NewMP3Segmenter() + publisher := &media.FakePublisher{} + playlist := playlist2.NewMediaPlaylist(resp.Body, segmenter, publisher) + playlist.AddConsumer(&consumer{}) + if err = playlist.Run(); err != nil { log.Fatal(err) } diff --git a/internal/segment/segment.go b/pkg/media/media.go similarity index 76% rename from internal/segment/segment.go rename to pkg/media/media.go index c9578b9..1040e13 100644 --- a/internal/segment/segment.go +++ b/pkg/media/media.go @@ -1,4 +1,8 @@ -package segment +// Package media contains logic related to parsing and segmenting media +// streams, such as MP3 streams. +// +// It is depended upon by the playlist package. +package media import ( "bytes" @@ -11,9 +15,17 @@ import ( const DefaultTargetDuration = 3 * time.Second +type Segmenter interface { + Segment(r io.Reader) (chan *Segment, error) +} + +type SegmentPublisher interface { + Publish(s *Segment) (string, error) +} + type Segment struct { - targetDuration time.Duration duration time.Duration + targetDuration time.Duration data *bytes.Buffer } @@ -49,11 +61,11 @@ func (s *Segment) Len() int { return s.data.Len() } -type MP3HTTPSegmenter struct { +type MP3Segmenter struct { decoder *mp3.Decoder } -func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan *Segment, error) { +func (s *MP3Segmenter) Segment(r io.Reader) (chan *Segment, error) { c := make(chan *Segment) go func() { @@ -104,6 +116,12 @@ func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan *Segment, error) { return c, nil } -func NewMP3HTTPSegmenter() *MP3HTTPSegmenter { - return &MP3HTTPSegmenter{} +func NewMP3Segmenter() *MP3Segmenter { + return &MP3Segmenter{} +} + +type FakePublisher struct{} + +func (p *FakePublisher) Publish(s *Segment) (string, error) { + return "https://www.example.com/segment.mp3", nil } diff --git a/internal/segment/segment_test.go b/pkg/media/media_test.go similarity index 71% rename from internal/segment/segment_test.go rename to pkg/media/media_test.go index 7ae1ee5..0146356 100644 --- a/internal/segment/segment_test.go +++ b/pkg/media/media_test.go @@ -1,16 +1,19 @@ -package segment_test +package media_test import ( + "segmento/pkg/media" "testing" "time" - "segmento/internal/segment" - "github.com/stretchr/testify/require" ) +func TestSegmenterImplements(t *testing.T) { + require.Implements(t, (*media.Segmenter)(nil), new(media.MP3Segmenter)) +} + func TestSegment(t *testing.T) { - segment := segment.NewSegment(10*time.Second, 0) + segment := media.NewSegment(10*time.Second, 0) require.Equal(t, time.Duration(0), segment.Duration()) require.True(t, segment.CanWrite(9*time.Second)) diff --git a/pkg/playlist2/playlist.go b/pkg/playlist2/playlist.go new file mode 100644 index 0000000..7be555e --- /dev/null +++ b/pkg/playlist2/playlist.go @@ -0,0 +1,125 @@ +package playlist2 + +import ( + "fmt" + "io" + "segmento/pkg/media" + "time" +) + +const DefaultMaxSegments = 10 + +type PlaylistSegment struct { + media.Segment + url string + seqId int +} + +type Playlist interface { + Len() int + AddConsumer(c Consumer) + RemoveConsumer(c Consumer) error +} + +type Consumer interface { + PlaylistUpdated(p Playlist) + PlaylistSegmentAdded(p Playlist, s *PlaylistSegment) +} + +// so we know that we can publish a segment (i.e. make available a URL to access it from elsewhere) +// but what does it mean to publish a Playlist? +// A playlist contains lists of Segments but it wouldn't necessarily be published in the same location +// for example, Segments may be published to S3 but a playlist may be published periodically to a static +// hosting location elsewhere. + +type MediaPlaylist struct { + nextSeqId int + src io.Reader // read a stream of bytes e.g. MP3 + segmenter media.Segmenter // segment the incoming bytes. For now, up to the caller to provider a matching src and segmenter. + publisher media.SegmentPublisher // publish the segments somewhere (i.e. make available a URL with them) + segments []*PlaylistSegment // a slice of the last n segments + consumers map[Consumer]bool +} + +func NewMediaPlaylist(src io.Reader, segmenter media.Segmenter, publisher media.SegmentPublisher) *MediaPlaylist { + p := MediaPlaylist{ + src: src, + segmenter: segmenter, + publisher: publisher, + segments: make([]*PlaylistSegment, 0, 10), + consumers: make(map[Consumer]bool), + } + return &p +} + +func (p *MediaPlaylist) Len() int { + return len(p.segments) +} + +func (p *MediaPlaylist) AddConsumer(c Consumer) { + p.consumers[c] = true +} + +func (p *MediaPlaylist) RemoveConsumer(c Consumer) error { + delete(p.consumers, c) + return nil +} + +func (p *MediaPlaylist) Run() error { + segments, err := p.segmenter.Segment(p.src) + if err != nil { + return err + } + + for s := range segments { + if err = p.handleSegment(s); err != nil { + return err + } + } + + return nil +} + +func (p *MediaPlaylist) handleSegment(s *media.Segment) error { + // first, publish the segment: + url, err := p.publisher.Publish(s) + if err != nil { + return err + } + + // initialize a new playlist segment: + nextSeqId := p.nextSeqId + p.nextSeqId++ + ps := PlaylistSegment{ + Segment: *s, // TODO make the Segmenter publish values, not pointers + seqId: nextSeqId, + url: url, + } + + // append the playlist segment to our slice of segments: + p.segments = append(p.segments, &ps) + + // trim the start of the playlist if needed: + if len(p.segments) > DefaultMaxSegments { + p.segments = p.segments[len(p.segments)-DefaultMaxSegments:] + } + + for c, _ := range p.consumers { + c.PlaylistSegmentAdded(p, &ps) + } + + return nil +} + +func (p *MediaPlaylist) Render() string { + var r string + r += "#EXTM3U\n" + r += "#EXT-X-VERSION:3\n" + r += "#EXT-X-TARGETDURATION:3\n" // TODO + for _, s := range p.segments { + r += fmt.Sprintf("#EXTINF:%.05f\n", float32(s.Duration())/float32(time.Second)) + r += "http://www.example.com/x.mp3\n" + } + r += "#EXT-X-ENDLIST" + return r +} diff --git a/pkg/playlist2/playlist_test.go b/pkg/playlist2/playlist_test.go new file mode 100644 index 0000000..bfef669 --- /dev/null +++ b/pkg/playlist2/playlist_test.go @@ -0,0 +1,102 @@ +package playlist2_test + +import ( + "io" + "segmento/pkg/media" + "segmento/pkg/playlist2" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type FakeReader struct { +} + +func (r *FakeReader) Read([]byte) (int, error) { + return 0, nil +} + +type FakeSegmenter struct { + count int +} + +func (s *FakeSegmenter) Segment(r io.Reader) (chan *media.Segment, error) { + c := make(chan *media.Segment) + + go func() { + dur := 2500 * time.Millisecond + for i := 0; i < s.count; i++ { + segment := media.NewSegment(dur, 0) + segment.IncrementDuration(dur) + c <- segment + } + close(c) + }() + + return c, nil +} + +type FakeSegmentPublisher struct { + count int +} + +func (p *FakeSegmentPublisher) Publish(s *media.Segment) (string, error) { + p.count++ + return "", nil +} + +type consumer struct { + pCount, sCount int +} + +func (c *consumer) PlaylistUpdated(p playlist2.Playlist) { + c.pCount++ +} + +func (c *consumer) PlaylistSegmentAdded(p playlist2.Playlist, s *playlist2.PlaylistSegment) { + c.sCount++ +} + +func TestMediaPlaylistImplements(t *testing.T) { + require.Implements(t, (*playlist2.Playlist)(nil), new(playlist2.MediaPlaylist)) +} + +func TestMediaPlaylist(t *testing.T) { + publisher := &FakeSegmentPublisher{} + playlist := playlist2.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{3}, publisher) + + err := playlist.Run() + require.NoError(t, err) + require.Equal(t, 3, playlist.Len()) + require.Equal(t, 3, publisher.count) +} + +func TestMediaPlaylistRender(t *testing.T) { + playlist := playlist2.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{2}, &FakeSegmentPublisher{}) + err := playlist.Run() + require.NoError(t, err) + + lines := strings.Split(playlist.Render(), "\n") + + require.Equal(t, "#EXTM3U", lines[0]) + require.Equal(t, "#EXT-X-VERSION:3", lines[1]) + require.Equal(t, "#EXT-X-TARGETDURATION:3", lines[2]) + require.Equal(t, "#EXTINF:2.50000", lines[3]) + require.Equal(t, "http://www.example.com/x.mp3", lines[4]) + require.Equal(t, "#EXTINF:2.50000", lines[5]) + require.Equal(t, "http://www.example.com/x.mp3", lines[6]) +} + +func TestMediaPlaylistConsumer(t *testing.T) { + consumer := &consumer{} + require.Implements(t, (*playlist2.Consumer)(nil), consumer) + + playlist := playlist2.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{4}, &FakeSegmentPublisher{}) + playlist.AddConsumer(consumer) + err := playlist.Run() + require.NoError(t, err) + require.Equal(t, 4, consumer.sCount) + require.Equal(t, 0, consumer.pCount) // TODO +} -- 2.40.1 From 7a294371b5669daa5f1be2f1cf449dad5bb0fe63 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Wed, 8 Jul 2020 19:46:14 +0200 Subject: [PATCH 10/12] Rename package playlist2 -> playlist --- main.go | 8 ++++---- pkg/{playlist2 => playlist}/playlist.go | 2 +- pkg/{playlist2 => playlist}/playlist_test.go | 18 +++++++++--------- 3 files changed, 14 insertions(+), 14 deletions(-) rename pkg/{playlist2 => playlist}/playlist.go (99%) rename pkg/{playlist2 => playlist}/playlist_test.go (74%) diff --git a/main.go b/main.go index bf358c8..176d6f2 100644 --- a/main.go +++ b/main.go @@ -7,16 +7,16 @@ import ( "net/http" "os" "segmento/pkg/media" - "segmento/pkg/playlist2" + "segmento/pkg/playlist" ) type consumer struct{} -func (c *consumer) PlaylistSegmentAdded(p playlist2.Playlist, s *playlist2.PlaylistSegment) { +func (c *consumer) PlaylistSegmentAdded(p playlist.Playlist, s *playlist.PlaylistSegment) { fmt.Println("in PlaylistSegmentAdded") } -func (c *consumer) PlaylistUpdated(p playlist2.Playlist) { +func (c *consumer) PlaylistUpdated(p playlist.Playlist) { fmt.Println("in PlaylistUpdated") } @@ -50,7 +50,7 @@ func main() { segmenter := media.NewMP3Segmenter() publisher := &media.FakePublisher{} - playlist := playlist2.NewMediaPlaylist(resp.Body, segmenter, publisher) + playlist := playlist.NewMediaPlaylist(resp.Body, segmenter, publisher) playlist.AddConsumer(&consumer{}) if err = playlist.Run(); err != nil { log.Fatal(err) diff --git a/pkg/playlist2/playlist.go b/pkg/playlist/playlist.go similarity index 99% rename from pkg/playlist2/playlist.go rename to pkg/playlist/playlist.go index 7be555e..8da014b 100644 --- a/pkg/playlist2/playlist.go +++ b/pkg/playlist/playlist.go @@ -1,4 +1,4 @@ -package playlist2 +package playlist import ( "fmt" diff --git a/pkg/playlist2/playlist_test.go b/pkg/playlist/playlist_test.go similarity index 74% rename from pkg/playlist2/playlist_test.go rename to pkg/playlist/playlist_test.go index bfef669..99aa6d8 100644 --- a/pkg/playlist2/playlist_test.go +++ b/pkg/playlist/playlist_test.go @@ -1,9 +1,9 @@ -package playlist2_test +package playlist_test import ( "io" "segmento/pkg/media" - "segmento/pkg/playlist2" + "segmento/pkg/playlist" "strings" "testing" "time" @@ -51,21 +51,21 @@ type consumer struct { pCount, sCount int } -func (c *consumer) PlaylistUpdated(p playlist2.Playlist) { +func (c *consumer) PlaylistUpdated(p playlist.Playlist) { c.pCount++ } -func (c *consumer) PlaylistSegmentAdded(p playlist2.Playlist, s *playlist2.PlaylistSegment) { +func (c *consumer) PlaylistSegmentAdded(p playlist.Playlist, s *playlist.PlaylistSegment) { c.sCount++ } func TestMediaPlaylistImplements(t *testing.T) { - require.Implements(t, (*playlist2.Playlist)(nil), new(playlist2.MediaPlaylist)) + require.Implements(t, (*playlist.Playlist)(nil), new(playlist.MediaPlaylist)) } func TestMediaPlaylist(t *testing.T) { publisher := &FakeSegmentPublisher{} - playlist := playlist2.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{3}, publisher) + playlist := playlist.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{3}, publisher) err := playlist.Run() require.NoError(t, err) @@ -74,7 +74,7 @@ func TestMediaPlaylist(t *testing.T) { } func TestMediaPlaylistRender(t *testing.T) { - playlist := playlist2.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{2}, &FakeSegmentPublisher{}) + playlist := playlist.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{2}, &FakeSegmentPublisher{}) err := playlist.Run() require.NoError(t, err) @@ -91,9 +91,9 @@ func TestMediaPlaylistRender(t *testing.T) { func TestMediaPlaylistConsumer(t *testing.T) { consumer := &consumer{} - require.Implements(t, (*playlist2.Consumer)(nil), consumer) + require.Implements(t, (*playlist.Consumer)(nil), consumer) - playlist := playlist2.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{4}, &FakeSegmentPublisher{}) + playlist := playlist.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{4}, &FakeSegmentPublisher{}) playlist.AddConsumer(consumer) err := playlist.Run() require.NoError(t, err) -- 2.40.1 From 49693d8e16abd467f7eeb86f56471df77a97281e Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Wed, 8 Jul 2020 19:53:07 +0200 Subject: [PATCH 11/12] go mod tidy --- go.mod | 3 --- go.sum | 26 +------------------------- 2 files changed, 1 insertion(+), 28 deletions(-) diff --git a/go.mod b/go.mod index e906851..2735aa5 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,6 @@ module segmento go 1.14 require ( - github.com/aws/aws-sdk-go v1.33.3 // indirect github.com/stretchr/testify v1.6.1 github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300 - golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect - golang.org/x/text v0.3.3 // indirect ) diff --git a/go.sum b/go.sum index 0c432bb..c177cf4 100644 --- a/go.sum +++ b/go.sum @@ -1,38 +1,14 @@ -github.com/aws/aws-sdk-go v1.33.3 h1:wjhURjD/xuBBxdCan0F5yuW7qzkSlYY4/RdYGlyab9s= -github.com/aws/aws-sdk-go v1.33.3/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= -github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= -github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= -github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300 h1:XQdibLKagjdevRB6vAjVY4qbSr8rQ610YzTkWcxzxSI= github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300/go.mod h1:FNa/dfN95vAYCNFrIKRrlRo+MBLbwmR9Asa5f2ljmBI= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= -golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= -golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -- 2.40.1 From 715ead52415fe002d0f8e6118c84c784ccfeb478 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Fri, 10 Jul 2020 10:35:03 +0200 Subject: [PATCH 12/12] Start on S3 side --- go.mod | 1 + go.sum | 12 ++++++++++++ main.go | 15 +++------------ pkg/playlist/playlist.go | 3 +++ pkg/s3/s3.go | 37 +++++++++++++++++++++++++++++++++++++ 5 files changed, 56 insertions(+), 12 deletions(-) create mode 100644 pkg/s3/s3.go diff --git a/go.mod b/go.mod index 2735aa5..64c1c5f 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module segmento go 1.14 require ( + github.com/aws/aws-sdk-go v1.33.4 github.com/stretchr/testify v1.6.1 github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300 ) diff --git a/go.sum b/go.sum index c177cf4..9e95fd7 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,26 @@ +github.com/aws/aws-sdk-go v1.33.4 h1:lhVZe2TkSjJz26jPBCBAvJvAy70Yxxlbm/Ciw1gmyRY= +github.com/aws/aws-sdk-go v1.33.4/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= +github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300 h1:XQdibLKagjdevRB6vAjVY4qbSr8rQ610YzTkWcxzxSI= github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300/go.mod h1:FNa/dfN95vAYCNFrIKRrlRo+MBLbwmR9Asa5f2ljmBI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 176d6f2..57191ca 100644 --- a/main.go +++ b/main.go @@ -2,24 +2,14 @@ package main import ( "flag" - "fmt" "log" "net/http" "os" "segmento/pkg/media" "segmento/pkg/playlist" + "segmento/pkg/s3" ) -type consumer struct{} - -func (c *consumer) PlaylistSegmentAdded(p playlist.Playlist, s *playlist.PlaylistSegment) { - fmt.Println("in PlaylistSegmentAdded") -} - -func (c *consumer) PlaylistUpdated(p playlist.Playlist) { - fmt.Println("in PlaylistUpdated") -} - func main() { // TODO accept some flags with: // URL - source of stream @@ -51,7 +41,8 @@ func main() { segmenter := media.NewMP3Segmenter() publisher := &media.FakePublisher{} playlist := playlist.NewMediaPlaylist(resp.Body, segmenter, publisher) - playlist.AddConsumer(&consumer{}) + playlist.AddConsumer(&s3.Consumer{}) + if err = playlist.Run(); err != nil { log.Fatal(err) } diff --git a/pkg/playlist/playlist.go b/pkg/playlist/playlist.go index 8da014b..82cef5b 100644 --- a/pkg/playlist/playlist.go +++ b/pkg/playlist/playlist.go @@ -41,6 +41,9 @@ type MediaPlaylist struct { consumers map[Consumer]bool } +// TODO does it make sense to do 50% dependency injection and 50% consumers here? +// Why not just pass everything through the consumer? +// Or how about define these methods on the interface and force implementations to provide them? func NewMediaPlaylist(src io.Reader, segmenter media.Segmenter, publisher media.SegmentPublisher) *MediaPlaylist { p := MediaPlaylist{ src: src, diff --git a/pkg/s3/s3.go b/pkg/s3/s3.go new file mode 100644 index 0000000..92b1fec --- /dev/null +++ b/pkg/s3/s3.go @@ -0,0 +1,37 @@ +package s3 + +import ( + "fmt" + "log" + "segmento/pkg/playlist" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" +) + +const DefaultAwsRegion = "eu-west-1" + +func init() { + sess, err := session.NewSession(&aws.Config{Region: aws.String(DefaultAwsRegion)}) + if err != nil { + log.Fatal(err) + } +} + +type Consumer struct { + S3Bucket string + S3PathPrefix string +} + +func NewConsumer(bucket string, pathPrefix string) *Consumer { + c := Consumer{bucket, pathPrefix} + return &c +} + +func (c *Consumer) PlaylistUpdated(p playlist.Playlist) { + fmt.Println("s3: PlaylistUpdated") +} + +func (c *Consumer) PlaylistSegmentAdded(p playlist.Playlist, s *playlist.PlaylistSegment) { + fmt.Println("s3: PlaylistSegmentAdded") +} -- 2.40.1