Self review #1

Open
rob wants to merge 12 commits from dev into master
9 changed files with 508 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/segmento

9
go.mod Normal file
View File

@ -0,0 +1,9 @@
module segmento
go 1.14
require (
github.com/aws/aws-sdk-go v1.33.4
github.com/stretchr/testify v1.6.1
github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300
)

26
go.sum Normal file
View File

@ -0,0 +1,26 @@
github.com/aws/aws-sdk-go v1.33.4 h1:lhVZe2TkSjJz26jPBCBAvJvAy70Yxxlbm/Ciw1gmyRY=
github.com/aws/aws-sdk-go v1.33.4/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300 h1:XQdibLKagjdevRB6vAjVY4qbSr8rQ610YzTkWcxzxSI=
github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300/go.mod h1:FNa/dfN95vAYCNFrIKRrlRo+MBLbwmR9Asa5f2ljmBI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

51
main.go Normal file
View File

@ -0,0 +1,51 @@
package main
import (
"flag"
"log"
"net/http"
"os"
"segmento/pkg/media"
"segmento/pkg/playlist"
"segmento/pkg/s3"
)
func main() {
// TODO accept some flags with:
// URL - source of stream
// TargetLength - length of segments in seconds
// Output
// -d some_dir/ => output playlist and chunks to this directory, cleaning up old files from time to time.
// -b 0.0.0.0:3000 => serve playlist and chunks from an HTTP server bound to this address
var url string
flag.StringVar(&url, "url", "", "URL of MP3 stream")
flag.Parse()
if url == "" {
log.Println("Invalid arguments")
flag.PrintDefaults()
os.Exit(-1)
}
client := &http.Client{}
resp, err := client.Get(url)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
segmenter := media.NewMP3Segmenter()
publisher := &media.FakePublisher{}
playlist := playlist.NewMediaPlaylist(resp.Body, segmenter, publisher)
playlist.AddConsumer(&s3.Consumer{})
if err = playlist.Run(); err != nil {
log.Fatal(err)
}
log.Println("exiting")
}

127
pkg/media/media.go Normal file
View File

@ -0,0 +1,127 @@
// Package media contains logic related to parsing and segmenting media
// streams, such as MP3 streams.
//
// It is depended upon by the playlist package.
package media
import (
"bytes"
"io"
"log"
"time"
"github.com/tcolgate/mp3"
)
const DefaultTargetDuration = 3 * time.Second
type Segmenter interface {
Segment(r io.Reader) (chan *Segment, error)
}
type SegmentPublisher interface {
Publish(s *Segment) (string, error)
}
type Segment struct {
duration time.Duration
targetDuration time.Duration
data *bytes.Buffer
}
// Initialize a new Segment struct. The capacity is the initial maximum capacity of the internal
// buffer, in bytes. It should be initialized with a value greater than the expected maximum buffer size,
// depending on the implementation.
func NewSegment(targetDuration time.Duration, capacity int) *Segment {
return &Segment{
data: bytes.NewBuffer(make([]byte, 0, capacity)),
duration: 0,
targetDuration: targetDuration,
}
}
func (s *Segment) ReadFrom(r io.Reader) (n int64, err error) {
return s.data.ReadFrom(r)
}
func (s *Segment) IncrementDuration(d time.Duration) time.Duration {
s.duration += d
return s.duration
}
func (s *Segment) CanWrite(d time.Duration) bool {
return s.targetDuration-s.duration >= d
}
func (s *Segment) Duration() time.Duration {
return s.duration
}
func (s *Segment) Len() int {
return s.data.Len()
}
type MP3Segmenter struct {
decoder *mp3.Decoder
}
func (s *MP3Segmenter) Segment(r io.Reader) (chan *Segment, error) {
c := make(chan *Segment)
go func() {
d := mp3.NewDecoder(r)
var (
v mp3.Frame
skipped int
)
var (
s *Segment
)
for {
if err := d.Decode(&v, &skipped); err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
if s != nil && !s.CanWrite(v.Duration()) {
// publish the current segment, and initialize a new one.
c <- s
s = nil
}
if s == nil {
// TODO what is a good initial capacity?
s = NewSegment(DefaultTargetDuration, 1024)
}
n, err := s.ReadFrom(v.Reader())
if err != nil {
log.Fatal(err) // TODO: some proper error handling
}
if n != int64(v.Size()) {
// TODO would this ever happen? What should IncrementDuration be called with?
log.Fatal("unexpeced frame size, want = ", v.Size(), "got = ", n)
}
s.IncrementDuration(v.Duration())
}
}()
return c, nil
}
func NewMP3Segmenter() *MP3Segmenter {
return &MP3Segmenter{}
}
type FakePublisher struct{}
func (p *FakePublisher) Publish(s *Segment) (string, error) {
return "https://www.example.com/segment.mp3", nil
}

27
pkg/media/media_test.go Normal file
View File

@ -0,0 +1,27 @@
package media_test
import (
"segmento/pkg/media"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestSegmenterImplements(t *testing.T) {
require.Implements(t, (*media.Segmenter)(nil), new(media.MP3Segmenter))
}
func TestSegment(t *testing.T) {
segment := media.NewSegment(10*time.Second, 0)
require.Equal(t, time.Duration(0), segment.Duration())
require.True(t, segment.CanWrite(9*time.Second))
require.True(t, segment.CanWrite(10*time.Second))
require.False(t, segment.CanWrite(11*time.Second))
d := segment.IncrementDuration(10 * time.Second)
require.Equal(t, segment.Duration(), d)
require.Equal(t, 10*time.Second, segment.Duration())
require.False(t, segment.CanWrite(1*time.Millisecond))
}

128
pkg/playlist/playlist.go Normal file
View File

@ -0,0 +1,128 @@
package playlist
import (
"fmt"
"io"
"segmento/pkg/media"
"time"
)
const DefaultMaxSegments = 10
type PlaylistSegment struct {
media.Segment
url string
seqId int
}
type Playlist interface {
Len() int
AddConsumer(c Consumer)
RemoveConsumer(c Consumer) error
}
type Consumer interface {
PlaylistUpdated(p Playlist)
PlaylistSegmentAdded(p Playlist, s *PlaylistSegment)
}
// so we know that we can publish a segment (i.e. make available a URL to access it from elsewhere)
// but what does it mean to publish a Playlist?
// A playlist contains lists of Segments but it wouldn't necessarily be published in the same location
// for example, Segments may be published to S3 but a playlist may be published periodically to a static
// hosting location elsewhere.
type MediaPlaylist struct {
nextSeqId int
src io.Reader // read a stream of bytes e.g. MP3
segmenter media.Segmenter // segment the incoming bytes. For now, up to the caller to provider a matching src and segmenter.
publisher media.SegmentPublisher // publish the segments somewhere (i.e. make available a URL with them)
segments []*PlaylistSegment // a slice of the last n segments
consumers map[Consumer]bool
}
// TODO does it make sense to do 50% dependency injection and 50% consumers here?
// Why not just pass everything through the consumer?
// Or how about define these methods on the interface and force implementations to provide them?
func NewMediaPlaylist(src io.Reader, segmenter media.Segmenter, publisher media.SegmentPublisher) *MediaPlaylist {
p := MediaPlaylist{
src: src,
segmenter: segmenter,
publisher: publisher,
segments: make([]*PlaylistSegment, 0, 10),
consumers: make(map[Consumer]bool),
}
return &p
}
func (p *MediaPlaylist) Len() int {
return len(p.segments)
}
func (p *MediaPlaylist) AddConsumer(c Consumer) {
p.consumers[c] = true
}
func (p *MediaPlaylist) RemoveConsumer(c Consumer) error {
delete(p.consumers, c)
return nil
}
func (p *MediaPlaylist) Run() error {
segments, err := p.segmenter.Segment(p.src)
if err != nil {
return err
}
for s := range segments {
if err = p.handleSegment(s); err != nil {
return err
}
}
return nil
}
func (p *MediaPlaylist) handleSegment(s *media.Segment) error {
// first, publish the segment:
url, err := p.publisher.Publish(s)
if err != nil {
return err
}
// initialize a new playlist segment:
nextSeqId := p.nextSeqId
p.nextSeqId++
ps := PlaylistSegment{
Segment: *s, // TODO make the Segmenter publish values, not pointers
seqId: nextSeqId,
url: url,
}
// append the playlist segment to our slice of segments:
p.segments = append(p.segments, &ps)
// trim the start of the playlist if needed:
if len(p.segments) > DefaultMaxSegments {
p.segments = p.segments[len(p.segments)-DefaultMaxSegments:]
}
for c, _ := range p.consumers {
c.PlaylistSegmentAdded(p, &ps)
}
return nil
}
func (p *MediaPlaylist) Render() string {
var r string
r += "#EXTM3U\n"
r += "#EXT-X-VERSION:3\n"
r += "#EXT-X-TARGETDURATION:3\n" // TODO
for _, s := range p.segments {
r += fmt.Sprintf("#EXTINF:%.05f\n", float32(s.Duration())/float32(time.Second))
r += "http://www.example.com/x.mp3\n"
}
r += "#EXT-X-ENDLIST"
return r
}

View File

@ -0,0 +1,102 @@
package playlist_test
import (
"io"
"segmento/pkg/media"
"segmento/pkg/playlist"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
)
type FakeReader struct {
}
func (r *FakeReader) Read([]byte) (int, error) {
return 0, nil
}
type FakeSegmenter struct {
count int
}
func (s *FakeSegmenter) Segment(r io.Reader) (chan *media.Segment, error) {
c := make(chan *media.Segment)
go func() {
dur := 2500 * time.Millisecond
for i := 0; i < s.count; i++ {
segment := media.NewSegment(dur, 0)
segment.IncrementDuration(dur)
c <- segment
}
close(c)
}()
return c, nil
}
type FakeSegmentPublisher struct {
count int
}
func (p *FakeSegmentPublisher) Publish(s *media.Segment) (string, error) {
p.count++
return "", nil
}
type consumer struct {
pCount, sCount int
}
func (c *consumer) PlaylistUpdated(p playlist.Playlist) {
c.pCount++
}
func (c *consumer) PlaylistSegmentAdded(p playlist.Playlist, s *playlist.PlaylistSegment) {
c.sCount++
}
func TestMediaPlaylistImplements(t *testing.T) {
require.Implements(t, (*playlist.Playlist)(nil), new(playlist.MediaPlaylist))
}
func TestMediaPlaylist(t *testing.T) {
publisher := &FakeSegmentPublisher{}
playlist := playlist.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{3}, publisher)
err := playlist.Run()
require.NoError(t, err)
require.Equal(t, 3, playlist.Len())
require.Equal(t, 3, publisher.count)
}
func TestMediaPlaylistRender(t *testing.T) {
playlist := playlist.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{2}, &FakeSegmentPublisher{})
err := playlist.Run()
require.NoError(t, err)
lines := strings.Split(playlist.Render(), "\n")
require.Equal(t, "#EXTM3U", lines[0])
require.Equal(t, "#EXT-X-VERSION:3", lines[1])
require.Equal(t, "#EXT-X-TARGETDURATION:3", lines[2])
require.Equal(t, "#EXTINF:2.50000", lines[3])
require.Equal(t, "http://www.example.com/x.mp3", lines[4])
require.Equal(t, "#EXTINF:2.50000", lines[5])
require.Equal(t, "http://www.example.com/x.mp3", lines[6])
}
func TestMediaPlaylistConsumer(t *testing.T) {
consumer := &consumer{}
require.Implements(t, (*playlist.Consumer)(nil), consumer)
playlist := playlist.NewMediaPlaylist(&FakeReader{}, &FakeSegmenter{4}, &FakeSegmentPublisher{})
playlist.AddConsumer(consumer)
err := playlist.Run()
require.NoError(t, err)
require.Equal(t, 4, consumer.sCount)
require.Equal(t, 0, consumer.pCount) // TODO
}

37
pkg/s3/s3.go Normal file
View File

@ -0,0 +1,37 @@
package s3
import (
"fmt"
"log"
"segmento/pkg/playlist"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
)
const DefaultAwsRegion = "eu-west-1"
func init() {
sess, err := session.NewSession(&aws.Config{Region: aws.String(DefaultAwsRegion)})
if err != nil {
log.Fatal(err)
}
}
type Consumer struct {
S3Bucket string
S3PathPrefix string
}
func NewConsumer(bucket string, pathPrefix string) *Consumer {
c := Consumer{bucket, pathPrefix}
return &c
}
func (c *Consumer) PlaylistUpdated(p playlist.Playlist) {
fmt.Println("s3: PlaylistUpdated")
}
func (c *Consumer) PlaylistSegmentAdded(p playlist.Playlist, s *playlist.PlaylistSegment) {
fmt.Println("s3: PlaylistSegmentAdded")
}