Self review #1

Open
rob wants to merge 12 commits from dev into master
5 changed files with 212 additions and 185 deletions
Showing only changes of commit e836b44941 - Show all commits

View File

@ -0,0 +1,72 @@
package playlist
import (
"segmento/internal/segment"
"time"
)
const DefaultPlaylistDuration = 20 * time.Second
type Playlist interface {
// These could be moved to an interface?
Duration() time.Duration
TargetDuration() time.Duration
AddSegment(s *segment.Segment) error
}
type MediaPlaylist struct {
Segments []*segment.Segment
}
func NewMediaPlaylist() *MediaPlaylist {
return &MediaPlaylist{
Segments: make([]*segment.Segment, 0, 10),
}
}
func (p *MediaPlaylist) Duration() time.Duration {
return p.durationOf(p.Segments)
}
func (p *MediaPlaylist) TargetDuration() time.Duration {
return DefaultPlaylistDuration
}
func (p *MediaPlaylist) AddSegment(s *segment.Segment) error {
p.Segments = append(p.Segments, s)
if len(p.Segments) == 1 {
return nil
}
for {
if p.durationOf(p.Segments[1:]) > p.TargetDuration() {
p.Segments = p.Segments[1:]
}
break
}
return nil
}
func (p *MediaPlaylist) durationOf(ss []*segment.Segment) time.Duration {
var t time.Duration
for _, s := range ss {
t += s.Duration()
}
return t
}
func (p *MediaPlaylist) Run() error {
for {
// TODO block here and listen to the channel of incoming segments.
// As the reader is Read and segments are produced, update the Playlist
// struct and possibly notify consumers.
// What would actually be a useful API and/or Go best practices?
}
}
type PlaylistListener interface {
SegmentAdded(s *segment.Segment) error
SegmentRemoved(s *segment.Segment) error
}

View File

@ -0,0 +1,25 @@
package playlist_test
import (
"segmento/internal/playlist"
"segmento/internal/segment"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestMediaPlaylistImplements(t *testing.T) {
require.Implements(t, (*playlist.Playlist)(nil), playlist.NewMediaPlaylist())
}
func TestMediaPlaylist(t *testing.T) {
playlist := playlist.NewMediaPlaylist()
for i := 0; i < 8; i++ {
s := segment.NewSegment(10*time.Second, 0)
s.IncrementDuration(3 * time.Second)
playlist.AddSegment(s)
}
require.Equal(t, 21*time.Second, playlist.Duration())
}

109
internal/segment/segment.go Normal file
View File

@ -0,0 +1,109 @@
package segment
import (
"bytes"
"io"
"log"
"time"
"github.com/tcolgate/mp3"
)
const DefaultTargetDuration = 3 * time.Second
type Segment struct {
targetDuration time.Duration
duration time.Duration
data *bytes.Buffer
}
// Initialize a new Segment struct. The capacity is the initial maximum capacity of the internal
// buffer, in bytes. It should be initialized with a value greater than the expected maximum buffer size,
// depending on the implementation.
func NewSegment(targetDuration time.Duration, capacity int) *Segment {
return &Segment{
data: bytes.NewBuffer(make([]byte, 0, capacity)),
duration: 0,
targetDuration: targetDuration,
}
}
func (s *Segment) ReadFrom(r io.Reader) (n int64, err error) {
return s.data.ReadFrom(r)
}
func (s *Segment) IncrementDuration(d time.Duration) time.Duration {
s.duration += d
return s.duration
}
func (s *Segment) CanWrite(d time.Duration) bool {
return s.targetDuration-s.duration >= d
}
func (s *Segment) Duration() time.Duration {
return s.duration
}
func (s *Segment) Len() int {
return s.data.Len()
}
type MP3HTTPSegmenter struct {
decoder *mp3.Decoder
}
func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan *Segment, error) {
c := make(chan *Segment)
go func() {
d := mp3.NewDecoder(r)
var (
v mp3.Frame
skipped int
)
var (
s *Segment
)
for {
if err := d.Decode(&v, &skipped); err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
if s != nil && !s.CanWrite(v.Duration()) {
// publish the current segment, and initialize a new one.
c <- s
s = nil
}
if s == nil {
// TODO what is a good initial capacity?
s = NewSegment(DefaultTargetDuration, 1024)
}
n, err := s.ReadFrom(v.Reader())
if err != nil {
log.Fatal(err) // TODO: some proper error handling
}
if n != int64(v.Size()) {
// TODO would this ever happen? What should IncrementDuration be called with?
log.Fatal("unexpeced frame size, want = ", v.Size(), "got = ", n)
}
s.IncrementDuration(v.Duration())
}
}()
return c, nil
}
func NewMP3HTTPSegmenter() *MP3HTTPSegmenter {
return &MP3HTTPSegmenter{}
}

View File

@ -1,14 +1,16 @@
package main
package segment_test
import (
"testing"
"time"
"segmento/internal/segment"
"github.com/stretchr/testify/require"
)
func TestSegment(t *testing.T) {
segment := newSegment(10*time.Second, 0)
segment := segment.NewSegment(10*time.Second, 0)
require.Equal(t, time.Duration(0), segment.Duration())
require.True(t, segment.CanWrite(9*time.Second))
@ -20,18 +22,3 @@ func TestSegment(t *testing.T) {
require.Equal(t, 10*time.Second, segment.Duration())
require.False(t, segment.CanWrite(1*time.Millisecond))
}
func TestMediaPlaylistImplements(t *testing.T) {
require.Implements(t, (*Playlist)(nil), newMediaPlaylist())
}
func TestMediaPlaylist(t *testing.T) {
playlist := newMediaPlaylist()
for i := 0; i < 8; i++ {
s := newSegment(10*time.Second, 0)
s.IncrementDuration(3 * time.Second)
playlist.AddSegment(s)
}
require.Equal(t, 21*time.Second, playlist.Duration())
}

170
main.go
View File

@ -1,181 +1,15 @@
package main
import (
"bytes"
"flag"
"io"
"log"
"net/http"
"os"
"time"
"github.com/tcolgate/mp3"
"segmento/internal/segment"
)
const DefaultTargetDuration = 3 * time.Second
const DefaultPlaylistDuration = 20 * time.Second
type Segment struct {
targetDuration time.Duration
duration time.Duration
data *bytes.Buffer
}
// Initialize a new Segment struct. The capacity is the initial maximum capacity of the internal
// buffer, in bytes. It should be initialized with a value greater than the expected maximum buffer size,
// depending on the implementation.
func newSegment(targetDuration time.Duration, capacity int) *Segment {
return &Segment{
data: bytes.NewBuffer(make([]byte, 0, capacity)),
duration: 0,
targetDuration: targetDuration,
}
}
func (s *Segment) ReadFrom(r io.Reader) (n int64, err error) {
return s.data.ReadFrom(r)
}
func (s *Segment) IncrementDuration(d time.Duration) time.Duration {
s.duration += d
return s.duration
}
func (s *Segment) CanWrite(d time.Duration) bool {
return s.targetDuration-s.duration >= d
}
func (s *Segment) Duration() time.Duration {
return s.duration
}
func (s *Segment) Len() int {
return s.data.Len()
}
type MP3HTTPSegmenter struct {
decoder *mp3.Decoder
}
func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan *Segment, error) {
c := make(chan *Segment)
go func() {
d := mp3.NewDecoder(r)
var (
v mp3.Frame
skipped int
)
var (
s *Segment
)
for {
if err := d.Decode(&v, &skipped); err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
if s != nil && !s.CanWrite(v.Duration()) {
// publish the current segment, and initialize a new one.
c <- s
s = nil
}
if s == nil {
// TODO what is a good initial capacity?
s = newSegment(DefaultTargetDuration, 1024)
}
n, err := s.ReadFrom(v.Reader())
if err != nil {
log.Fatal(err) // TODO: some proper error handling
}
if n != int64(v.Size()) {
// TODO would this ever happen? What should IncrementDuration be called with?
log.Fatal("unexpeced frame size, want = ", v.Size(), "got = ", n)
}
s.IncrementDuration(v.Duration())
}
}()
return c, nil
}
func newMP3HTTPSegmenter() *MP3HTTPSegmenter {
return &MP3HTTPSegmenter{}
}
type Playlist interface {
// These could be moved to an interface?
Duration() time.Duration
TargetDuration() time.Duration
AddSegment(s *Segment) error
}
type MediaPlaylist struct {
Segments []*Segment
}
func newMediaPlaylist() *MediaPlaylist {
return &MediaPlaylist{
Segments: make([]*Segment, 0, 10),
}
}
func (p *MediaPlaylist) Duration() time.Duration {
return p.durationOf(p.Segments)
}
func (p *MediaPlaylist) TargetDuration() time.Duration {
return DefaultPlaylistDuration
}
func (p *MediaPlaylist) AddSegment(s *Segment) error {
p.Segments = append(p.Segments, s)
if len(p.Segments) == 1 {
return nil
}
for {
if p.durationOf(p.Segments[1:]) > p.TargetDuration() {
p.Segments = p.Segments[1:]
}
break
}
return nil
}
func (p *MediaPlaylist) durationOf(ss []*Segment) time.Duration {
var t time.Duration
for _, s := range ss {
t += s.Duration()
}
return t
}
func (p *MediaPlaylist) Run() error {
for {
// TODO block here and listen to the channel of incoming segments.
// As the reader is Read and segments are produced, update the Playlist
// struct and possibly notify consumers.
// What would actually be a useful API and/or Go best practices?
}
}
type PlaylistListener interface {
SegmentAdded(s *Segment) error
SegmentRemoved(s *Segment) error
}
func main() {
// TODO accept some flags with:
// URL - source of stream
@ -204,7 +38,7 @@ func main() {
defer resp.Body.Close()
segmenter := newMP3HTTPSegmenter()
segmenter := segment.NewMP3HTTPSegmenter()
segments, err := segmenter.Segment(resp.Body)
if err != nil {
log.Fatal(err)