From e836b44941ee8f827e9831b77c99b2158ec35db1 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Tue, 7 Jul 2020 16:35:20 +0200 Subject: [PATCH] Refactor into packages --- internal/playlist/playlist.go | 72 ++++++++ internal/playlist/playlist_test.go | 25 +++ internal/segment/segment.go | 109 +++++++++++ .../segment/segment_test.go | 21 +-- main.go | 170 +----------------- 5 files changed, 212 insertions(+), 185 deletions(-) create mode 100644 internal/playlist/playlist.go create mode 100644 internal/playlist/playlist_test.go create mode 100644 internal/segment/segment.go rename main_test.go => internal/segment/segment_test.go (54%) diff --git a/internal/playlist/playlist.go b/internal/playlist/playlist.go new file mode 100644 index 0000000..2062d2a --- /dev/null +++ b/internal/playlist/playlist.go @@ -0,0 +1,72 @@ +package playlist + +import ( + "segmento/internal/segment" + "time" +) + +const DefaultPlaylistDuration = 20 * time.Second + +type Playlist interface { + // These could be moved to an interface? + Duration() time.Duration + TargetDuration() time.Duration + AddSegment(s *segment.Segment) error +} + +type MediaPlaylist struct { + Segments []*segment.Segment +} + +func NewMediaPlaylist() *MediaPlaylist { + return &MediaPlaylist{ + Segments: make([]*segment.Segment, 0, 10), + } +} + +func (p *MediaPlaylist) Duration() time.Duration { + return p.durationOf(p.Segments) +} + +func (p *MediaPlaylist) TargetDuration() time.Duration { + return DefaultPlaylistDuration +} + +func (p *MediaPlaylist) AddSegment(s *segment.Segment) error { + p.Segments = append(p.Segments, s) + + if len(p.Segments) == 1 { + return nil + } + + for { + if p.durationOf(p.Segments[1:]) > p.TargetDuration() { + p.Segments = p.Segments[1:] + } + break + } + + return nil +} + +func (p *MediaPlaylist) durationOf(ss []*segment.Segment) time.Duration { + var t time.Duration + for _, s := range ss { + t += s.Duration() + } + return t +} + +func (p *MediaPlaylist) Run() error { + for { + // TODO block here and listen to the channel of incoming segments. + // As the reader is Read and segments are produced, update the Playlist + // struct and possibly notify consumers. + // What would actually be a useful API and/or Go best practices? + } +} + +type PlaylistListener interface { + SegmentAdded(s *segment.Segment) error + SegmentRemoved(s *segment.Segment) error +} diff --git a/internal/playlist/playlist_test.go b/internal/playlist/playlist_test.go new file mode 100644 index 0000000..06c62bc --- /dev/null +++ b/internal/playlist/playlist_test.go @@ -0,0 +1,25 @@ +package playlist_test + +import ( + "segmento/internal/playlist" + "segmento/internal/segment" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestMediaPlaylistImplements(t *testing.T) { + require.Implements(t, (*playlist.Playlist)(nil), playlist.NewMediaPlaylist()) +} + +func TestMediaPlaylist(t *testing.T) { + playlist := playlist.NewMediaPlaylist() + + for i := 0; i < 8; i++ { + s := segment.NewSegment(10*time.Second, 0) + s.IncrementDuration(3 * time.Second) + playlist.AddSegment(s) + } + require.Equal(t, 21*time.Second, playlist.Duration()) +} diff --git a/internal/segment/segment.go b/internal/segment/segment.go new file mode 100644 index 0000000..c9578b9 --- /dev/null +++ b/internal/segment/segment.go @@ -0,0 +1,109 @@ +package segment + +import ( + "bytes" + "io" + "log" + "time" + + "github.com/tcolgate/mp3" +) + +const DefaultTargetDuration = 3 * time.Second + +type Segment struct { + targetDuration time.Duration + duration time.Duration + data *bytes.Buffer +} + +// Initialize a new Segment struct. The capacity is the initial maximum capacity of the internal +// buffer, in bytes. It should be initialized with a value greater than the expected maximum buffer size, +// depending on the implementation. +func NewSegment(targetDuration time.Duration, capacity int) *Segment { + return &Segment{ + data: bytes.NewBuffer(make([]byte, 0, capacity)), + duration: 0, + targetDuration: targetDuration, + } +} + +func (s *Segment) ReadFrom(r io.Reader) (n int64, err error) { + return s.data.ReadFrom(r) +} + +func (s *Segment) IncrementDuration(d time.Duration) time.Duration { + s.duration += d + return s.duration +} + +func (s *Segment) CanWrite(d time.Duration) bool { + return s.targetDuration-s.duration >= d +} + +func (s *Segment) Duration() time.Duration { + return s.duration +} + +func (s *Segment) Len() int { + return s.data.Len() +} + +type MP3HTTPSegmenter struct { + decoder *mp3.Decoder +} + +func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan *Segment, error) { + c := make(chan *Segment) + + go func() { + d := mp3.NewDecoder(r) + + var ( + v mp3.Frame + skipped int + ) + + var ( + s *Segment + ) + + for { + if err := d.Decode(&v, &skipped); err != nil { + if err == io.EOF { + break + } + log.Fatal(err) + } + + if s != nil && !s.CanWrite(v.Duration()) { + // publish the current segment, and initialize a new one. + c <- s + s = nil + } + + if s == nil { + // TODO what is a good initial capacity? + s = NewSegment(DefaultTargetDuration, 1024) + } + + n, err := s.ReadFrom(v.Reader()) + if err != nil { + log.Fatal(err) // TODO: some proper error handling + } + + if n != int64(v.Size()) { + // TODO would this ever happen? What should IncrementDuration be called with? + log.Fatal("unexpeced frame size, want = ", v.Size(), "got = ", n) + } + + s.IncrementDuration(v.Duration()) + } + }() + + return c, nil +} + +func NewMP3HTTPSegmenter() *MP3HTTPSegmenter { + return &MP3HTTPSegmenter{} +} diff --git a/main_test.go b/internal/segment/segment_test.go similarity index 54% rename from main_test.go rename to internal/segment/segment_test.go index ff0fb2a..7ae1ee5 100644 --- a/main_test.go +++ b/internal/segment/segment_test.go @@ -1,14 +1,16 @@ -package main +package segment_test import ( "testing" "time" + "segmento/internal/segment" + "github.com/stretchr/testify/require" ) func TestSegment(t *testing.T) { - segment := newSegment(10*time.Second, 0) + segment := segment.NewSegment(10*time.Second, 0) require.Equal(t, time.Duration(0), segment.Duration()) require.True(t, segment.CanWrite(9*time.Second)) @@ -20,18 +22,3 @@ func TestSegment(t *testing.T) { require.Equal(t, 10*time.Second, segment.Duration()) require.False(t, segment.CanWrite(1*time.Millisecond)) } - -func TestMediaPlaylistImplements(t *testing.T) { - require.Implements(t, (*Playlist)(nil), newMediaPlaylist()) -} - -func TestMediaPlaylist(t *testing.T) { - playlist := newMediaPlaylist() - - for i := 0; i < 8; i++ { - s := newSegment(10*time.Second, 0) - s.IncrementDuration(3 * time.Second) - playlist.AddSegment(s) - } - require.Equal(t, 21*time.Second, playlist.Duration()) -} diff --git a/main.go b/main.go index 3ae5dde..9a5cc21 100644 --- a/main.go +++ b/main.go @@ -1,181 +1,15 @@ package main import ( - "bytes" "flag" - "io" "log" "net/http" "os" "time" - "github.com/tcolgate/mp3" + "segmento/internal/segment" ) -const DefaultTargetDuration = 3 * time.Second -const DefaultPlaylistDuration = 20 * time.Second - -type Segment struct { - targetDuration time.Duration - duration time.Duration - data *bytes.Buffer -} - -// Initialize a new Segment struct. The capacity is the initial maximum capacity of the internal -// buffer, in bytes. It should be initialized with a value greater than the expected maximum buffer size, -// depending on the implementation. -func newSegment(targetDuration time.Duration, capacity int) *Segment { - return &Segment{ - data: bytes.NewBuffer(make([]byte, 0, capacity)), - duration: 0, - targetDuration: targetDuration, - } -} - -func (s *Segment) ReadFrom(r io.Reader) (n int64, err error) { - return s.data.ReadFrom(r) -} - -func (s *Segment) IncrementDuration(d time.Duration) time.Duration { - s.duration += d - return s.duration -} - -func (s *Segment) CanWrite(d time.Duration) bool { - return s.targetDuration-s.duration >= d -} - -func (s *Segment) Duration() time.Duration { - return s.duration -} - -func (s *Segment) Len() int { - return s.data.Len() -} - -type MP3HTTPSegmenter struct { - decoder *mp3.Decoder -} - -func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan *Segment, error) { - c := make(chan *Segment) - - go func() { - d := mp3.NewDecoder(r) - - var ( - v mp3.Frame - skipped int - ) - - var ( - s *Segment - ) - - for { - if err := d.Decode(&v, &skipped); err != nil { - if err == io.EOF { - break - } - log.Fatal(err) - } - - if s != nil && !s.CanWrite(v.Duration()) { - // publish the current segment, and initialize a new one. - c <- s - s = nil - } - - if s == nil { - // TODO what is a good initial capacity? - s = newSegment(DefaultTargetDuration, 1024) - } - - n, err := s.ReadFrom(v.Reader()) - if err != nil { - log.Fatal(err) // TODO: some proper error handling - } - - if n != int64(v.Size()) { - // TODO would this ever happen? What should IncrementDuration be called with? - log.Fatal("unexpeced frame size, want = ", v.Size(), "got = ", n) - } - - s.IncrementDuration(v.Duration()) - } - }() - - return c, nil -} - -func newMP3HTTPSegmenter() *MP3HTTPSegmenter { - return &MP3HTTPSegmenter{} -} - -type Playlist interface { - // These could be moved to an interface? - Duration() time.Duration - TargetDuration() time.Duration - AddSegment(s *Segment) error -} - -type MediaPlaylist struct { - Segments []*Segment -} - -func newMediaPlaylist() *MediaPlaylist { - return &MediaPlaylist{ - Segments: make([]*Segment, 0, 10), - } -} - -func (p *MediaPlaylist) Duration() time.Duration { - return p.durationOf(p.Segments) -} - -func (p *MediaPlaylist) TargetDuration() time.Duration { - return DefaultPlaylistDuration -} - -func (p *MediaPlaylist) AddSegment(s *Segment) error { - p.Segments = append(p.Segments, s) - - if len(p.Segments) == 1 { - return nil - } - - for { - if p.durationOf(p.Segments[1:]) > p.TargetDuration() { - p.Segments = p.Segments[1:] - } - break - } - - return nil -} - -func (p *MediaPlaylist) durationOf(ss []*Segment) time.Duration { - var t time.Duration - for _, s := range ss { - t += s.Duration() - } - return t -} - -func (p *MediaPlaylist) Run() error { - for { - // TODO block here and listen to the channel of incoming segments. - // As the reader is Read and segments are produced, update the Playlist - // struct and possibly notify consumers. - // What would actually be a useful API and/or Go best practices? - } -} - -type PlaylistListener interface { - SegmentAdded(s *Segment) error - SegmentRemoved(s *Segment) error -} - func main() { // TODO accept some flags with: // URL - source of stream @@ -204,7 +38,7 @@ func main() { defer resp.Body.Close() - segmenter := newMP3HTTPSegmenter() + segmenter := segment.NewMP3HTTPSegmenter() segments, err := segmenter.Segment(resp.Body) if err != nil { log.Fatal(err)