package main import ( "bytes" "flag" "io" "log" "net/http" "os" "time" "github.com/tcolgate/mp3" ) const DefaultTargetDuration = 3 * time.Second type Segment struct { targetDuration time.Duration duration time.Duration data *bytes.Buffer } // Initialize a new Segment struct. The capacity is the initial maximum capacity of the internal // buffer, in bytes. It should be initialized with a value greater than the expected maximum buffer size, // depending on the implementation. func newSegment(targetDuration time.Duration, capacity int) *Segment { return &Segment{ data: bytes.NewBuffer(make([]byte, 0, capacity)), duration: 0, targetDuration: targetDuration, } } func (s *Segment) ReadFrom(r io.Reader) (n int64, err error) { return s.data.ReadFrom(r) } func (s *Segment) IncrementDuration(d time.Duration) time.Duration { s.duration += d return s.duration } func (s *Segment) CanWrite(d time.Duration) bool { return s.targetDuration-s.duration >= d } func (s *Segment) Duration() time.Duration { return s.duration } func (s *Segment) Len() int { return s.data.Len() } type MP3HTTPSegmenter struct { decoder *mp3.Decoder } func (s *MP3HTTPSegmenter) Segment(r io.Reader) (chan *Segment, error) { c := make(chan *Segment) go func() { d := mp3.NewDecoder(r) var ( v mp3.Frame skipped int ) var ( s *Segment ) for { if err := d.Decode(&v, &skipped); err != nil { if err == io.EOF { break } log.Fatal(err) } if s != nil && !s.CanWrite(v.Duration()) { // publish the current segment, and initialize a new one. c <- s s = nil } if s == nil { // TODO what is a good initial capacity? s = newSegment(DefaultTargetDuration, 1024) } n, err := s.ReadFrom(v.Reader()) if err != nil { log.Fatal(err) // TODO: some proper error handling } if n != int64(v.Size()) { // TODO would this ever happen? log.Fatal("unexpeced frame size, want = ", v.Size(), "got = ", n) } s.IncrementDuration(v.Duration()) } }() return c, nil } func newMP3HTTPSegmenter() *MP3HTTPSegmenter { return &MP3HTTPSegmenter{} } type Playlist interface { // These could be moved to an interface? Duration() time.Duration TargetDuration() time.Duration } type MediaPlaylist struct { Segments []*Segment } func newMediaPlaylist() *MediaPlaylist { return &MediaPlaylist{ Segments: make([]*Segment, 0, 10), } } func (p *MediaPlaylist) Duration() time.Duration { var t time.Duration for _, s := range p.Segments { t += s.Duration() } return t } func (p *MediaPlaylist) Run() error { for { // TODO block here and listen to the channel of incoming segments. // As the reader is Read and segments are produced, update the Playlist // struct and possibly notify consumers. // What would actually be a useful API and/or Go best practices? } } func main() { // TODO accept some flags with: // URL - source of stream // TargetLength - length of segments in seconds // Output // -d some_dir/ => output playlist and chunks to this directory, cleaning up old files from time to time. // -b 0.0.0.0:3000 => serve playlist and chunks from an HTTP server bound to this address var url string flag.StringVar(&url, "url", "", "URL of MP3 stream") flag.Parse() if url == "" { log.Println("Invalid arguments") flag.PrintDefaults() os.Exit(-1) } client := &http.Client{} resp, err := client.Get(url) if err != nil { log.Fatal(err) } defer resp.Body.Close() segmenter := newMP3HTTPSegmenter() segments, err := segmenter.Segment(resp.Body) if err != nil { log.Fatal(err) } go func() { for s := range segments { log.Println("got segment with duration", s.Duration(), "and len", s.Len(), "bytes") } }() time.Sleep(10 * time.Second) log.Println("exiting") }