From eedb2cd7e800274e4edc34494494a4e6a6080f75 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Wed, 8 Jul 2020 19:31:57 +0200 Subject: [PATCH] Refactor: transfer ownership of most things to Playlist --- internal/playlist/playlist.go | 120 ----------------- internal/playlist/playlist_test.go | 68 ---------- internal/s3/s3.go | 21 --- internal/s3/s3_test.go | 13 -- main.go | 34 ++--- .../segment/segment.go => pkg/media/media.go | 30 ++++- .../media/media_test.go | 11 +- pkg/playlist2/playlist.go | 125 ++++++++++++++++++ pkg/playlist2/playlist_test.go | 102 ++++++++++++++ 9 files changed, 271 insertions(+), 253 deletions(-) delete mode 100644 internal/playlist/playlist.go delete mode 100644 internal/playlist/playlist_test.go delete mode 100644 internal/s3/s3.go delete mode 100644 internal/s3/s3_test.go rename internal/segment/segment.go => pkg/media/media.go (76%) rename internal/segment/segment_test.go => pkg/media/media_test.go (71%) create mode 100644 pkg/playlist2/playlist.go create mode 100644 pkg/playlist2/playlist_test.go diff --git a/internal/playlist/playlist.go b/internal/playlist/playlist.go deleted file mode 100644 index af94ae4..0000000 --- a/internal/playlist/playlist.go +++ /dev/null @@ -1,120 +0,0 @@ -package playlist - -import ( - "fmt" - "log" - "time" - - segmentpkg "segmento/internal/segment" -) - -const DefaultPlaylistDuration = 20 * time.Second - -type segment struct { - segmentpkg.Segment - seqId int -} - -type PlaylistListener interface { - OnUpdate(p Playlist) -} - -type Playlist interface { - Duration() time.Duration - TargetDuration() time.Duration - Render() string - ReadSegments(chan *segmentpkg.Segment) error - AddListener(l PlaylistListener) -} - -type MediaPlaylist struct { - Segments []*segment - Listeners []PlaylistListener - - seqId int -} - -func NewMediaPlaylist() *MediaPlaylist { - return &MediaPlaylist{ - Segments: make([]*segment, 0, 10), - Listeners: make([]PlaylistListener, 0), - } -} - -func (p *MediaPlaylist) nextSeqId() int { - next := p.seqId - p.seqId++ - return next -} - -func (p *MediaPlaylist) Duration() time.Duration { - return p.durationOf(p.Segments) -} - -func (p *MediaPlaylist) TargetDuration() time.Duration { - return DefaultPlaylistDuration -} - -// pass by pointer or value here? -// I think value is better, the struct is tiny and only contains a pointer -// to the data buffer -func (p *MediaPlaylist) AddSegment(s *segmentpkg.Segment) error { - nextId := p.nextSeqId() - ss := segment{*s, nextId} - p.Segments = append(p.Segments, &ss) - - if len(p.Segments) > 1 { - for { - if p.durationOf(p.Segments[1:]) > p.TargetDuration() { - p.Segments = p.Segments[1:] - } - break - } - } - - p.updateListeners() - - return nil -} - -func (p *MediaPlaylist) durationOf(ss []*segment) time.Duration { - var t time.Duration - for _, s := range ss { - t += s.Duration() - } - return t -} - -func (p *MediaPlaylist) Render() string { - var r string - r += "#EXTM3U\n" - r += "#EXT-X-VERSION:3\n" - r += "#EXT-X-TARGETDURATION:3\n" // TODO - for _, s := range p.Segments { - r += fmt.Sprintf("#EXTINF:%.05f\n", float32(s.Duration())/float32(time.Second)) - r += "http://www.example.com/x.mp3\n" - } - r += "#EXT-X-ENDLIST" - return r -} - -// Listeners - -func (p *MediaPlaylist) AddListener(l PlaylistListener) { - p.Listeners = append(p.Listeners, l) -} - -func (p *MediaPlaylist) updateListeners() { - for _, l := range p.Listeners { - l.OnUpdate(p) - } -} - -func (p *MediaPlaylist) ReadSegments(segments chan *segmentpkg.Segment) error { - for s := range segments { - log.Println("got segment with duration", s.Duration(), "and len", s.Len(), "bytes") - p.AddSegment(s) - } - log.Println("exiting ReadSegments") - return nil -} diff --git a/internal/playlist/playlist_test.go b/internal/playlist/playlist_test.go deleted file mode 100644 index c4f46dd..0000000 --- a/internal/playlist/playlist_test.go +++ /dev/null @@ -1,68 +0,0 @@ -package playlist_test - -import ( - "segmento/internal/playlist" - "segmento/internal/segment" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestMediaPlaylistImplements(t *testing.T) { - require.Implements(t, (*playlist.Playlist)(nil), playlist.NewMediaPlaylist()) -} - -func TestMediaPlaylist(t *testing.T) { - playlist := playlist.NewMediaPlaylist() - - for i := 0; i < 8; i++ { - s := segment.NewSegment(3*time.Second, 0) - s.IncrementDuration(3 * time.Second) - playlist.AddSegment(s) - } - require.Equal(t, 21*time.Second, playlist.Duration()) -} - -func TestMediaPlaylistRender(t *testing.T) { - playlist := playlist.NewMediaPlaylist() - - s := segment.NewSegment(3*time.Second, 0) - s.IncrementDuration(3 * time.Second) - playlist.AddSegment(s) - - s = segment.NewSegment(3*time.Second, 0) - s.IncrementDuration(2700 * time.Millisecond) - playlist.AddSegment(s) - - lines := strings.Split(playlist.Render(), "\n") - - require.Equal(t, "#EXTM3U", lines[0]) - require.Equal(t, "#EXT-X-VERSION:3", lines[1]) - require.Equal(t, "#EXT-X-TARGETDURATION:3", lines[2]) - require.Equal(t, "#EXTINF:3.00000", lines[3]) - require.Equal(t, "http://www.example.com/x.mp3", lines[4]) - require.Equal(t, "#EXTINF:2.70000", lines[5]) - require.Equal(t, "http://www.example.com/x.mp3", lines[6]) -} - -type listener struct { - count int -} - -func (l *listener) OnUpdate(p playlist.Playlist) { - l.count++ -} - -func TestMediaPlaylistListener(t *testing.T) { - playlist := playlist.NewMediaPlaylist() - l := &listener{} - playlist.AddListener(l) - - s := segment.NewSegment(3*time.Second, 0) - s.IncrementDuration(3 * time.Second) - playlist.AddSegment(s) - - require.Equal(t, 1, l.count) -} diff --git a/internal/s3/s3.go b/internal/s3/s3.go deleted file mode 100644 index 2456007..0000000 --- a/internal/s3/s3.go +++ /dev/null @@ -1,21 +0,0 @@ -package s3 - -import "segmento/internal/playlist" - -//import ( -//"github.com/aws/aws-sdk-go/aws" -//"github.com/aws/aws-sdk-go/aws/session" -//"github.com/aws/aws-sdk-go/service/s3" -//) - -type S3PlaylistUploader struct { - AWSAccessKeyID string - AWSSecretAccessKey string - AWSRegion string - BucketName string - PathPrefix string -} - -func (u *S3PlaylistUploader) OnUpdate(p playlist.Playlist) { - -} diff --git a/internal/s3/s3_test.go b/internal/s3/s3_test.go deleted file mode 100644 index 570342f..0000000 --- a/internal/s3/s3_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package s3_test - -import ( - "segmento/internal/playlist" - "segmento/internal/s3" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestS3PlaylistUploaderImplements(t *testing.T) { - require.Implements(t, (*playlist.PlaylistListener)(nil), &s3.S3PlaylistUploader{}) -} diff --git a/main.go b/main.go index 6520c1e..bf358c8 100644 --- a/main.go +++ b/main.go @@ -6,19 +6,18 @@ import ( "log" "net/http" "os" - - "segmento/internal/playlist" - "segmento/internal/segment" + "segmento/pkg/media" + "segmento/pkg/playlist2" ) -type listener struct { +type consumer struct{} + +func (c *consumer) PlaylistSegmentAdded(p playlist2.Playlist, s *playlist2.PlaylistSegment) { + fmt.Println("in PlaylistSegmentAdded") } -func (l *listener) OnUpdate(p playlist.Playlist) { - fmt.Println("Playlist updated") - fmt.Println("Playlist:") - fmt.Println(p.Render()) - fmt.Println("") +func (c *consumer) PlaylistUpdated(p playlist2.Playlist) { + fmt.Println("in PlaylistUpdated") } func main() { @@ -49,18 +48,11 @@ func main() { defer resp.Body.Close() - listener := listener{} - playlist := playlist.NewMediaPlaylist() - playlist.AddListener(&listener) - - segmenter := segment.NewMP3HTTPSegmenter() - segments, err := segmenter.Segment(resp.Body) - if err != nil { - log.Fatal(err) - } - - // Block while reading the segments into the playlist: - if err = playlist.ReadSegments(segments); err != nil { + segmenter := media.NewMP3Segmenter() + publisher := &media.FakePublisher{} + playlist := playlist2.NewMediaPlaylist(resp.Body, segmenter, publisher) + playlist.AddConsumer(&consumer{}) + if err = playlist.Run(); err != nil { log.Fatal(err) } diff --git a/internal/segment/segment.go b/pkg/media/media.go similarity index 76% rename from internal/segment/segment.go rename to pkg/media/media.go index c9578b9..1040e13 100644 --- a/internal/segment/segment.go +++ b/pkg/media/media.go @@ -1,4 +1,8 @@ -package segment +// Package media contains logic related to parsing and segmenting media +// streams, such as MP3 streams. +// +// It is depended upon by the playlist package. +package media import ( "bytes" @@ -11,9 +15,17 @@ import ( const DefaultTargetDuration = 3 * time.Second +type Segmenter interface { + Segment(r io.Reader) (chan *Segment, error) +} + +type SegmentPublisher interface { + Publish(s *Segment) (string, error) +} + type Segment struct { - targetDuration time.Duration duration time.Duration + targetDuration time.Duration data *bytes.Buffer } @@ -49,11 +61,11 @@ func (s *Segment) Len() int { return s.data.Len() } -type MP3HTTPSegmenter struct { +type MP3Segmenter struct { decoder *mp3.Decoder } -func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan *Segment, error) { +func (s *MP3Segmenter) Segment(r io.Reader) (chan *Segment, error) { c := make(chan *Segment) go func() { @@ -104,6 +116,12 @@ func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan *Segment, error) { return c, nil } -func NewMP3HTTPSegmenter() *MP3HTTPSegmenter { - return &MP3HTTPSegmenter{} +func NewMP3Segmenter() *MP3Segmenter { + return &MP3Segmenter{} +} + +type FakePublisher struct{} + +func (p *FakePublisher) Publish(s *Segment) (string, error) { + return "https://www.example.com/segment.mp3", nil } diff --git a/internal/segment/segment_test.go b/pkg/media/media_test.go similarity index 71% rename from internal/segment/segment_test.go rename to pkg/media/media_test.go index 7ae1ee5..0146356 100644 --- a/internal/segment/segment_test.go +++ b/pkg/media/media_test.go @@ -1,16 +1,19 @@ -package segment_test +package media_test import ( + "segmento/pkg/media" "testing" "time" - "segmento/internal/segment" - "github.com/stretchr/testify/require" ) +func TestSegmenterImplements(t *testing.T) { + require.Implements(t, (*media.Segmenter)(nil), new(media.MP3Segmenter)) +} + func TestSegment(t *testing.T) { - segment := segment.NewSegment(10*time.Second, 0) + segment := media.NewSegment(10*time.Second, 0) require.Equal(t, time.Duration(0), segment.Duration()) require.True(t, segment.CanWrite(9*time.Second)) diff --git a/pkg/playlist2/playlist.go b/pkg/playlist2/playlist.go new file mode 100644 index 0000000..7be555e --- /dev/null +++ b/pkg/playlist2/playlist.go @@ -0,0 +1,125 @@ +package playlist2 + +import ( + "fmt" + "io" + "segmento/pkg/media" + "time" +) + +const DefaultMaxSegments = 10 + +type PlaylistSegment struct { + media.Segment + url string + seqId int +} + +type Playlist interface { + Len() int + AddConsumer(c Consumer) + RemoveConsumer(c Consumer) error +} + +type Consumer interface { + PlaylistUpdated(p Playlist) + PlaylistSegmentAdded(p Playlist, s *PlaylistSegment) +} + +// so we know that we can publish a segment (i.e. make available a URL to access it from elsewhere) +// but what does it mean to publish a Playlist? +// A playlist contains lists of Segments but it wouldn't necessarily be published in the same location +// for example, Segments may be published to S3 but a playlist may be published periodically to a static +// hosting location elsewhere. + +type MediaPlaylist struct { + nextSeqId int + src io.Reader // read a stream of bytes e.g. MP3 + segmenter media.Segmenter // segment the incoming bytes. For now, up to the caller to provider a matching src and segmenter. + publisher media.SegmentPublisher // publish the segments somewhere (i.e. make available a URL with them) + segments []*PlaylistSegment // a slice of the last n segments + consumers map[Consumer]bool +} + +func NewMediaPlaylist(src io.Reader, segmenter media.Segmenter, publisher media.SegmentPublisher) *MediaPlaylist { + p := MediaPlaylist{ + src: src, + segmenter: segmenter, + publisher: publisher, + segments: make([]*PlaylistSegment, 0, 10), + consumers: make(map[Consumer]bool), + } + return &p +} + +func (p *MediaPlaylist) Len() int { + return len(p.segments) +} + +func (p *MediaPlaylist) AddConsumer(c Consumer) { + p.consumers[c] = true +} + +func (p *MediaPlaylist) RemoveConsumer(c Consumer) error { + delete(p.consumers, c) + return nil +} + +func (p *MediaPlaylist) Run() error { + segments, err := p.segmenter.Segment(p.src) + if err != nil { + return err + } + + for s := range segments { + if err = p.handleSegment(s); err != nil { + return err + } + } + + return nil +} + +func (p *MediaPlaylist) handleSegment(s *media.Segment) error { + // first, publish the segment: + url, err := p.publisher.Publish(s) + if err != nil { + return err + } + + // initialize a new playlist segment: + nextSeqId := p.nextSeqId + p.nextSeqId++ + ps := PlaylistSegment{ + Segment: *s, // TODO make the Segmenter publish values, not pointers + seqId: nextSeqId, + url: url, + } + + // append the playlist segment to our slice of segments: + p.segments = append(p.segments, &ps) + + // trim the start of the playlist if needed: + if len(p.segments) > DefaultMaxSegments { + p.segments = p.segments[len(p.segments)-DefaultMaxSegments:] + } + + for c, _ := range p.consumers { + c.PlaylistSegmentAdded(p, &ps) + } + + return nil +} + +func (p *MediaPlaylist) Render() string { + var r string + r += "#EXTM3U\n" + r += "#EXT-X-VERSION:3\n" + r += "#EXT-X-TARGETDURATION:3\n" // TODO + for _, s := range p.segments { + r += fmt.Sprintf("#EXTINF:%.05f\n", float32(s.Duration())/float32(time.Second)) + r += "http://www.example.com/x.mp3\n" + } + r += "#EXT-X-ENDLIST" + return r +} diff --git a/pkg/playlist2/playlist_test.go b/pkg/playlist2/playlist_test.go new file mode 100644 index 0000000..bfef669 --- /dev/null +++ b/pkg/playlist2/playlist_test.go @@ -0,0 +1,102 @@ +package playlist2_test + +import ( + "io" + "segmento/pkg/media" + "segmento/pkg/playlist2" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type FakeReader struct { +} + +func (r *FakeReader) Read([]byte) (int, error) { + return 0, nil +} + +type FakeSegmenter struct { + count int +} + +func (s *FakeSegmenter) Segment(r io.Reader) (chan *media.Segment, error) { + c := make(chan *media.Segment) + + go func() { + dur := 2500 * time.Millisecond + for i := 0; i < s.count; i++ { + segment := media.NewSegment(dur, 0) + segment.IncrementDuration(dur) + c <- segment + } + close(c) + }() + + return c, nil +} + +type FakeSegmentPublisher struct { + count int +} + +func (p *FakeSegmentPublisher) Publish(s *media.Segment) (string, error) { + p.count++ + return "", nil +} + +type consumer struct { + pCount, sCount int +} + +func (c *consumer) PlaylistUpdated(p playlist2.Playlist) { + c.pCount++ +} + +func (c *consumer) PlaylistSegmentAdded(p playlist2.Playlist, s *playlist2.PlaylistSegment) { + c.sCount++ +} + +func TestMediaPlaylistImplements(t *testing.T) { + require.Implements(t, (*playlist2.Playlist)(nil), new(playlist2.MediaPlaylist)) +} + +func TestMediaPlaylist(t *testing.T) { + publisher := &FakeSegmentPublisher{} + playlist := playlist2.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{3}, publisher) + + err := playlist.Run() + require.NoError(t, err) + require.Equal(t, 3, playlist.Len()) + require.Equal(t, 3, publisher.count) +} + +func TestMediaPlaylistRender(t *testing.T) { + playlist := playlist2.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{2}, &FakeSegmentPublisher{}) + err := playlist.Run() + require.NoError(t, err) + + lines := strings.Split(playlist.Render(), "\n") + + require.Equal(t, "#EXTM3U", lines[0]) + require.Equal(t, "#EXT-X-VERSION:3", lines[1]) + require.Equal(t, "#EXT-X-TARGETDURATION:3", lines[2]) + require.Equal(t, "#EXTINF:2.50000", lines[3]) + require.Equal(t, "http://www.example.com/x.mp3", lines[4]) + require.Equal(t, "#EXTINF:2.50000", lines[5]) + require.Equal(t, "http://www.example.com/x.mp3", lines[6]) +} + +func TestMediaPlaylistConsumer(t *testing.T) { + consumer := &consumer{} + require.Implements(t, (*playlist2.Consumer)(nil), consumer) + + playlist := playlist2.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{4}, &FakeSegmentPublisher{}) + playlist.AddConsumer(consumer) + err := playlist.Run() + require.NoError(t, err) + require.Equal(t, 4, consumer.sCount) + require.Equal(t, 0, consumer.pCount) // TODO +}