feat(mediaserver): configurable RTMP host and bind address
This commit is contained in:
parent
e113d55044
commit
b147da6d9b
10
README.md
10
README.md
@ -100,6 +100,10 @@ sources:
|
|||||||
rtmp:
|
rtmp:
|
||||||
enabled: true # must be true
|
enabled: true # must be true
|
||||||
streamKey: live # defaults to "live"
|
streamKey: live # defaults to "live"
|
||||||
|
host: rtmp.example.com # defaults to "localhost"
|
||||||
|
bindAddr: # optional
|
||||||
|
ip: 0.0.0.0 # defaults to 127.0.0.1
|
||||||
|
port: 1935 # defaults to 1935
|
||||||
destinations:
|
destinations:
|
||||||
- name: YouTube # Destination name, used only for display
|
- name: YouTube # Destination name, used only for display
|
||||||
url: rtmp://rtmp.youtube.com/12345 # Destination URL with stream key
|
url: rtmp://rtmp.youtube.com/12345 # Destination URL with stream key
|
||||||
@ -108,9 +112,13 @@ destinations:
|
|||||||
# other destinations here
|
# other destinations here
|
||||||
```
|
```
|
||||||
|
|
||||||
:warning: It is also possible to add and remove destinations directly from the
|
:information_source: It is also possible to add and remove destinations directly from the
|
||||||
terminal user interface.
|
terminal user interface.
|
||||||
|
|
||||||
|
:warning: `sources.rtmp.bindAddr.ip` must be set to a valid IP address if you want
|
||||||
|
to accept connections from other hosts. Leave it blank to bind only to
|
||||||
|
localhost (`127.0.0.1`) or use `0.0.0.0` to bind to all network interfaces.
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
|
||||||
### Bug reports
|
### Bug reports
|
||||||
|
@ -89,6 +89,8 @@ func Run(ctx context.Context, params RunParams) error {
|
|||||||
updateUI()
|
updateUI()
|
||||||
|
|
||||||
srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{
|
srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{
|
||||||
|
RTMPAddr: domain.NetAddr(cfg.Sources.RTMP.BindAddr),
|
||||||
|
RTMPHost: cfg.Sources.RTMP.Host,
|
||||||
StreamKey: mediaserver.StreamKey(cfg.Sources.RTMP.StreamKey),
|
StreamKey: mediaserver.StreamKey(cfg.Sources.RTMP.StreamKey),
|
||||||
ContainerClient: containerClient,
|
ContainerClient: containerClient,
|
||||||
Logger: logger.With("component", "mediaserver"),
|
Logger: logger.With("component", "mediaserver"),
|
||||||
|
@ -30,12 +30,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestIntegration(t *testing.T) {
|
func TestIntegration(t *testing.T) {
|
||||||
t.Run("with default stream key", func(t *testing.T) {
|
t.Run("with default host, port and stream key", func(t *testing.T) {
|
||||||
testIntegration(t, "")
|
testIntegration(t, "", "", 0, "")
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("with custom stream key", func(t *testing.T) {
|
t.Run("with custom host, port and stream key", func(t *testing.T) {
|
||||||
testIntegration(t, "s0meK3y")
|
testIntegration(t, "localhost", "0.0.0.0", 3000, "s0meK3y")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -45,11 +45,14 @@ func TestIntegration(t *testing.T) {
|
|||||||
// https://stackoverflow.com/a/60740997/62871
|
// https://stackoverflow.com/a/60740997/62871
|
||||||
const hostIP = "172.17.0.1"
|
const hostIP = "172.17.0.1"
|
||||||
|
|
||||||
func testIntegration(t *testing.T, streamKey string) {
|
func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int, streamKey string) {
|
||||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
wantRTMPHost := cmp.Or(rtmpHost, "localhost")
|
||||||
|
wantRTMPPort := cmp.Or(rtmpPort, 1935)
|
||||||
wantStreamKey := cmp.Or(streamKey, "live")
|
wantStreamKey := cmp.Or(streamKey, "live")
|
||||||
|
wantRTMPURL := fmt.Sprintf("rtmp://%s:%d/%s", wantRTMPHost, wantRTMPPort, wantStreamKey)
|
||||||
|
|
||||||
destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
||||||
ContainerRequest: testcontainers.ContainerRequest{
|
ContainerRequest: testcontainers.ContainerRequest{
|
||||||
@ -74,7 +77,13 @@ func testIntegration(t *testing.T, streamKey string) {
|
|||||||
destURL1 := fmt.Sprintf("rtmp://%s:%d/%s/dest1", hostIP, destServerPort.Int(), wantStreamKey)
|
destURL1 := fmt.Sprintf("rtmp://%s:%d/%s/dest1", hostIP, destServerPort.Int(), wantStreamKey)
|
||||||
destURL2 := fmt.Sprintf("rtmp://%s:%d/%s/dest2", hostIP, destServerPort.Int(), wantStreamKey)
|
destURL2 := fmt.Sprintf("rtmp://%s:%d/%s/dest2", hostIP, destServerPort.Int(), wantStreamKey)
|
||||||
configService := setupConfigService(t, config.Config{
|
configService := setupConfigService(t, config.Config{
|
||||||
Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true, StreamKey: streamKey}},
|
Sources: config.Sources{
|
||||||
|
RTMP: config.RTMPSource{
|
||||||
|
Enabled: true,
|
||||||
|
Host: rtmpHost,
|
||||||
|
BindAddr: config.NetAddr{IP: rtmpIP, Port: rtmpPort},
|
||||||
|
StreamKey: streamKey,
|
||||||
|
}},
|
||||||
// Load one destination from config, add the other in-app.
|
// Load one destination from config, add the other in-app.
|
||||||
Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}},
|
Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}},
|
||||||
})
|
})
|
||||||
@ -116,7 +125,7 @@ func testIntegration(t *testing.T, streamKey string) {
|
|||||||
printScreen(t, getContents, "After starting the mediaserver")
|
printScreen(t, getContents, "After starting the mediaserver")
|
||||||
|
|
||||||
// Start streaming a test video to the app:
|
// Start streaming a test video to the app:
|
||||||
testhelpers.StreamFLV(t, "rtmp://localhost:1935/"+wantStreamKey)
|
testhelpers.StreamFLV(t, wantRTMPURL)
|
||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
@ -124,7 +133,7 @@ func testIntegration(t *testing.T, streamKey string) {
|
|||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
||||||
|
|
||||||
assert.Contains(t, contents[1], "URL rtmp://localhost:1935/"+wantStreamKey, "expected mediaserver status to be receiving")
|
assert.Contains(t, contents[1], "URL "+wantRTMPURL, "expected mediaserver status to be receiving")
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||||
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
||||||
@ -256,6 +265,48 @@ func testIntegration(t *testing.T, streamKey string) {
|
|||||||
<-done
|
<-done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIntegrationCustomRTMPURL(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
logger := testhelpers.NewTestLogger(t).With("component", "integration")
|
||||||
|
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
configService := setupConfigService(t, config.Config{
|
||||||
|
Sources: config.Sources{
|
||||||
|
RTMP: config.RTMPSource{
|
||||||
|
Enabled: true,
|
||||||
|
Host: "rtmp.live.tv",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
done <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||||
|
}()
|
||||||
|
|
||||||
|
require.EventuallyWithT(
|
||||||
|
t,
|
||||||
|
func(t *assert.CollectT) {
|
||||||
|
assert.True(t, contentsIncludes(getContents(), "URL rtmp://rtmp.live.tv:1935/live"), "expected to see custom host name")
|
||||||
|
},
|
||||||
|
5*time.Second,
|
||||||
|
time.Second,
|
||||||
|
"expected to see custom host name",
|
||||||
|
)
|
||||||
|
printScreen(t, getContents, "Ater displaying the fatal error modal")
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
<-done
|
||||||
|
}
|
||||||
func TestIntegrationRestartDestination(t *testing.T) {
|
func TestIntegrationRestartDestination(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
@ -22,10 +22,18 @@ func (l LogFile) GetPath() string {
|
|||||||
return cmp.Or(l.Path, l.defaultPath)
|
return cmp.Or(l.Path, l.defaultPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NetAddr holds an IP and/or port.
|
||||||
|
type NetAddr struct {
|
||||||
|
IP string `yaml:"ip,omitempty"`
|
||||||
|
Port int `yaml:"port,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// RTMPSource holds the configuration for the RTMP source.
|
// RTMPSource holds the configuration for the RTMP source.
|
||||||
type RTMPSource struct {
|
type RTMPSource struct {
|
||||||
Enabled bool `yaml:"enabled"`
|
Enabled bool `yaml:"enabled"`
|
||||||
StreamKey string `yaml:"streamKey,omitempty"`
|
StreamKey string `yaml:"streamKey,omitempty"`
|
||||||
|
Host string `yaml:"host,omitempty"`
|
||||||
|
BindAddr NetAddr `yaml:"bindAddr,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sources holds the configuration for the sources.
|
// Sources holds the configuration for the sources.
|
||||||
|
@ -93,6 +93,11 @@ func TestConfigServiceReadConfig(t *testing.T) {
|
|||||||
RTMP: config.RTMPSource{
|
RTMP: config.RTMPSource{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
StreamKey: "s3cr3t",
|
StreamKey: "s3cr3t",
|
||||||
|
Host: "rtmp.example.com",
|
||||||
|
BindAddr: config.NetAddr{
|
||||||
|
IP: "0.0.0.0",
|
||||||
|
Port: 19350,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Destinations: []config.Destination{
|
Destinations: []config.Destination{
|
||||||
|
4
internal/config/testdata/complete.yml
vendored
4
internal/config/testdata/complete.yml
vendored
@ -6,6 +6,10 @@ sources:
|
|||||||
rtmp:
|
rtmp:
|
||||||
enabled: true
|
enabled: true
|
||||||
streamKey: s3cr3t
|
streamKey: s3cr3t
|
||||||
|
host: rtmp.example.com
|
||||||
|
bindAddr:
|
||||||
|
ip: 0.0.0.0
|
||||||
|
port: 19350
|
||||||
destinations:
|
destinations:
|
||||||
- name: my stream
|
- name: my stream
|
||||||
url: rtmp://rtmp.example.com:1935/live
|
url: rtmp://rtmp.example.com:1935/live
|
||||||
|
@ -57,6 +57,12 @@ type Destination struct {
|
|||||||
URL string
|
URL string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NetAddr holds a network address.
|
||||||
|
type NetAddr struct {
|
||||||
|
IP string
|
||||||
|
Port int
|
||||||
|
}
|
||||||
|
|
||||||
// Container status strings.
|
// Container status strings.
|
||||||
//
|
//
|
||||||
// TODO: refactor to strictly reflect Docker status strings.
|
// TODO: refactor to strictly reflect Docker status strings.
|
||||||
|
@ -8,7 +8,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
typescontainer "github.com/docker/docker/api/types/container"
|
typescontainer "github.com/docker/docker/api/types/container"
|
||||||
@ -29,7 +28,9 @@ type StreamKey string
|
|||||||
const (
|
const (
|
||||||
defaultFetchIngressStateInterval = 5 * time.Second // default interval to fetch the state of the media server
|
defaultFetchIngressStateInterval = 5 * time.Second // default interval to fetch the state of the media server
|
||||||
defaultAPIPort = 9997 // default API host port for the media server
|
defaultAPIPort = 9997 // default API host port for the media server
|
||||||
|
defaultRTMPIP = "127.0.0.1" // default RTMP host IP, bound to localhost for security
|
||||||
defaultRTMPPort = 1935 // default RTMP host port for the media server
|
defaultRTMPPort = 1935 // default RTMP host port for the media server
|
||||||
|
defaultRTMPHost = "localhost" // default RTMP host name, used for the RTMP URL
|
||||||
defaultChanSize = 64 // default channel size for asynchronous non-error channels
|
defaultChanSize = 64 // default channel size for asynchronous non-error channels
|
||||||
imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx
|
imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx
|
||||||
defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey].
|
defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey].
|
||||||
@ -47,7 +48,8 @@ type Actor struct {
|
|||||||
chanSize int
|
chanSize int
|
||||||
containerClient *container.Client
|
containerClient *container.Client
|
||||||
apiPort int
|
apiPort int
|
||||||
rtmpPort int
|
rtmpAddr domain.NetAddr
|
||||||
|
rtmpHost string
|
||||||
streamKey StreamKey
|
streamKey StreamKey
|
||||||
fetchIngressStateInterval time.Duration
|
fetchIngressStateInterval time.Duration
|
||||||
pass string // password for the media server
|
pass string // password for the media server
|
||||||
@ -63,7 +65,8 @@ type Actor struct {
|
|||||||
// actor.
|
// actor.
|
||||||
type NewActorParams struct {
|
type NewActorParams struct {
|
||||||
APIPort int // defaults to 9997
|
APIPort int // defaults to 9997
|
||||||
RTMPPort int // defaults to 1935
|
RTMPAddr domain.NetAddr // defaults to 127.0.0.1:1935
|
||||||
|
RTMPHost string // defaults to "localhost"
|
||||||
StreamKey StreamKey // defaults to "live"
|
StreamKey StreamKey // defaults to "live"
|
||||||
ChanSize int // defaults to 64
|
ChanSize int // defaults to 64
|
||||||
FetchIngressStateInterval time.Duration // defaults to 5 seconds
|
FetchIngressStateInterval time.Duration // defaults to 5 seconds
|
||||||
@ -84,10 +87,15 @@ func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error)
|
|||||||
return nil, fmt.Errorf("build API client: %w", err)
|
return nil, fmt.Errorf("build API client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rtmpAddr := params.RTMPAddr
|
||||||
|
rtmpAddr.IP = cmp.Or(rtmpAddr.IP, defaultRTMPIP)
|
||||||
|
rtmpAddr.Port = cmp.Or(rtmpAddr.Port, defaultRTMPPort)
|
||||||
|
|
||||||
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
|
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
|
||||||
return &Actor{
|
return &Actor{
|
||||||
apiPort: cmp.Or(params.APIPort, defaultAPIPort),
|
apiPort: cmp.Or(params.APIPort, defaultAPIPort),
|
||||||
rtmpPort: cmp.Or(params.RTMPPort, defaultRTMPPort),
|
rtmpAddr: rtmpAddr,
|
||||||
|
rtmpHost: cmp.Or(params.RTMPHost, defaultRTMPHost),
|
||||||
streamKey: cmp.Or(params.StreamKey, defaultStreamKey),
|
streamKey: cmp.Or(params.StreamKey, defaultStreamKey),
|
||||||
fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval),
|
fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval),
|
||||||
tlsCert: tlsCert,
|
tlsCert: tlsCert,
|
||||||
@ -104,10 +112,8 @@ func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *Actor) Start(ctx context.Context) error {
|
func (a *Actor) Start(ctx context.Context) error {
|
||||||
// Exposed ports are bound to 127.0.0.1 for security.
|
apiPortSpec := nat.Port(fmt.Sprintf("127.0.0.1:%d:9997", a.apiPort))
|
||||||
// TODO: configurable RTMP bind address
|
rtmpPortSpec := nat.Port(fmt.Sprintf("%s:%d:%d", a.rtmpAddr.IP, a.rtmpAddr.Port, 1935))
|
||||||
apiPortSpec := nat.Port("127.0.0.1:" + strconv.Itoa(a.apiPort) + ":9997")
|
|
||||||
rtmpPortSpec := nat.Port("127.0.0.1:" + strconv.Itoa(+a.rtmpPort) + ":1935")
|
|
||||||
exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)})
|
exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)})
|
||||||
|
|
||||||
// The RTMP URL is passed to the UI via the state.
|
// The RTMP URL is passed to the UI via the state.
|
||||||
@ -155,6 +161,7 @@ func (a *Actor) Start(ctx context.Context) error {
|
|||||||
return fmt.Errorf("marshal config: %w", err)
|
return fmt.Errorf("marshal config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
a.logger.Info("Starting media server", "host", a.rtmpHost, "bind_ip", a.rtmpAddr.IP, "bind_port", a.rtmpAddr.Port)
|
||||||
containerStateC, errC := a.containerClient.RunContainer(
|
containerStateC, errC := a.containerClient.RunContainer(
|
||||||
ctx,
|
ctx,
|
||||||
container.RunContainerParams{
|
container.RunContainerParams{
|
||||||
@ -352,7 +359,7 @@ func (s *Actor) handleContainerExit(err error) {
|
|||||||
|
|
||||||
// RTMPURL returns the RTMP URL for the media server, accessible from the host.
|
// RTMPURL returns the RTMP URL for the media server, accessible from the host.
|
||||||
func (s *Actor) RTMPURL() string {
|
func (s *Actor) RTMPURL() string {
|
||||||
return fmt.Sprintf("rtmp://localhost:%d/%s", s.rtmpPort, s.streamKey)
|
return fmt.Sprintf("rtmp://%s:%d/%s", s.rtmpHost, s.rtmpAddr.Port, s.streamKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RTMPInternalURL returns the RTMP URL for the media server, accessible from
|
// RTMPInternalURL returns the RTMP URL for the media server, accessible from
|
||||||
|
@ -696,7 +696,7 @@ func (ui *UI) redrawFromState(state domain.AppState) {
|
|||||||
SetSelectable(false)
|
SetSelectable(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
ui.sourceViews.url.SetText(state.Source.RTMPURL)
|
ui.sourceViews.url.SetText(cmp.Or(state.Source.RTMPURL, dash))
|
||||||
|
|
||||||
tracks := dash
|
tracks := dash
|
||||||
if state.Source.Live && len(state.Source.Tracks) > 0 {
|
if state.Source.Live && len(state.Source.Tracks) > 0 {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user