Compare commits
2 Commits
60c7fb458b
...
98d93ad286
Author | SHA1 | Date | |
---|---|---|---|
|
98d93ad286 | ||
|
5f026be769 |
13
README.md
13
README.md
@ -100,9 +100,14 @@ sources:
|
||||
mediaServer:
|
||||
streamKey: live # defaults to "live"
|
||||
host: rtmp.example.com # defaults to "localhost"
|
||||
rtmp: # must be present, use `rtmp: {}` for defaults
|
||||
ip: 0.0.0.0 # defaults to 127.0.0.1
|
||||
rtmp:
|
||||
enabled: true # defaults to false
|
||||
ip: 127.0.0.1 # defaults to 127.0.0.1
|
||||
port: 1935 # defaults to 1935
|
||||
rtmps:
|
||||
enabled: true # defaults to false
|
||||
ip: 0.0.0.0 # defaults to 127.0.0.1
|
||||
port: 1936 # defaults to 1936
|
||||
destinations:
|
||||
- name: YouTube # Destination name, used only for display
|
||||
url: rtmp://rtmp.youtube.com/12345 # Destination URL with stream key
|
||||
@ -114,8 +119,8 @@ destinations:
|
||||
:information_source: It is also possible to add and remove destinations directly from the
|
||||
terminal user interface.
|
||||
|
||||
:warning: `sources.rtmp.bindAddr.ip` must be set to a valid IP address if you want
|
||||
to accept connections from other hosts. Leave it blank to bind only to
|
||||
:warning: `sources.mediaServer.rtmp.ip` must be set to a valid IP address if
|
||||
you want to accept connections from other hosts. Leave it blank to bind only to
|
||||
localhost (`127.0.0.1`) or use `0.0.0.0` to bind to all network interfaces.
|
||||
|
||||
## Contributing
|
||||
|
@ -38,9 +38,9 @@ func Run(ctx context.Context, params RunParams) error {
|
||||
state := new(domain.AppState)
|
||||
applyConfig(cfg, state)
|
||||
|
||||
// While RTMP is the only source, it doesn't make sense to disable it.
|
||||
if cfg.Sources.MediaServer.RTMP == nil {
|
||||
return errors.New("config: sources.mediaServer.rtmp is required")
|
||||
// Ensure there is at least one active source.
|
||||
if !cfg.Sources.MediaServer.RTMP.Enabled && !cfg.Sources.MediaServer.RTMPS.Enabled {
|
||||
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
|
||||
}
|
||||
|
||||
logger := params.Logger
|
||||
@ -89,7 +89,8 @@ func Run(ctx context.Context, params RunParams) error {
|
||||
updateUI()
|
||||
|
||||
srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{
|
||||
RTMPAddr: domain.NetAddr(cfg.Sources.MediaServer.RTMP.NetAddr),
|
||||
RTMPAddr: buildNetAddr(cfg.Sources.MediaServer.RTMP),
|
||||
RTMPSAddr: buildNetAddr(cfg.Sources.MediaServer.RTMPS),
|
||||
Host: cfg.Sources.MediaServer.Host,
|
||||
StreamKey: mediaserver.StreamKey(cfg.Sources.MediaServer.StreamKey),
|
||||
ContainerClient: containerClient,
|
||||
@ -104,6 +105,10 @@ func Run(ctx context.Context, params RunParams) error {
|
||||
}
|
||||
defer srv.Close()
|
||||
|
||||
// Set the RTMP and RTMPS URLs in the UI, which are only known after the
|
||||
// MediaServer is available.
|
||||
ui.SetRTMPURLs(srv.RTMPURL(), srv.RTMPSURL())
|
||||
|
||||
repl := replicator.StartActor(ctx, replicator.StartActorParams{
|
||||
SourceURL: srv.RTMPInternalURL(),
|
||||
ContainerClient: containerClient,
|
||||
@ -321,3 +326,12 @@ func doStartupCheck(ctx context.Context, containerClient *container.Client, show
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
// buildNetAddr builds a [mediaserver.OptionalNetAddr] from the config.
|
||||
func buildNetAddr(src config.RTMPSource) mediaserver.OptionalNetAddr {
|
||||
if !src.Enabled {
|
||||
return mediaserver.OptionalNetAddr{Enabled: false}
|
||||
}
|
||||
|
||||
return mediaserver.OptionalNetAddr{Enabled: true, NetAddr: domain.NetAddr(src.NetAddr)}
|
||||
}
|
||||
|
@ -85,6 +85,8 @@ func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- termina
|
||||
lines[y] += string(screenCells[n].Runes[0])
|
||||
}
|
||||
|
||||
require.GreaterOrEqual(t, len(lines), 5, "Screen contents should have at least 5 lines")
|
||||
|
||||
return lines
|
||||
}
|
||||
|
||||
|
@ -29,13 +29,41 @@ import (
|
||||
"github.com/testcontainers/testcontainers-go/wait"
|
||||
)
|
||||
|
||||
const waitTime = time.Minute
|
||||
|
||||
func TestIntegration(t *testing.T) {
|
||||
t.Run("with default host, port and stream key", func(t *testing.T) {
|
||||
testIntegration(t, "", "", 0, "")
|
||||
t.Run("RTMP with default host, port and stream key", func(t *testing.T) {
|
||||
testIntegration(t, config.MediaServerSource{
|
||||
RTMP: config.RTMPSource{Enabled: true},
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("with custom host, port and stream key", func(t *testing.T) {
|
||||
testIntegration(t, "localhost", "0.0.0.0", 3000, "s0meK3y")
|
||||
t.Run("RTMPS with default host, port and stream key", func(t *testing.T) {
|
||||
testIntegration(t, config.MediaServerSource{
|
||||
RTMPS: config.RTMPSource{Enabled: true},
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("RTMP with custom host, port and stream key", func(t *testing.T) {
|
||||
testIntegration(t, config.MediaServerSource{
|
||||
StreamKey: "s0meK3y",
|
||||
Host: "localhost",
|
||||
RTMP: config.RTMPSource{
|
||||
Enabled: true,
|
||||
NetAddr: config.NetAddr{IP: "0.0.0.0", Port: 3000},
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("RTMPS with custom host, port and stream key", func(t *testing.T) {
|
||||
testIntegration(t, config.MediaServerSource{
|
||||
StreamKey: "an0therK3y",
|
||||
Host: "localhost",
|
||||
RTMPS: config.RTMPSource{
|
||||
Enabled: true,
|
||||
NetAddr: config.NetAddr{IP: "0.0.0.0", Port: 443},
|
||||
},
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@ -45,27 +73,39 @@ func TestIntegration(t *testing.T) {
|
||||
// https://stackoverflow.com/a/60740997/62871
|
||||
const hostIP = "172.17.0.1"
|
||||
|
||||
func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int, streamKey string) {
|
||||
func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
|
||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
wantRTMPHost := cmp.Or(rtmpHost, "localhost")
|
||||
wantRTMPPort := cmp.Or(rtmpPort, 1935)
|
||||
wantStreamKey := cmp.Or(streamKey, "live")
|
||||
wantRTMPURL := fmt.Sprintf("rtmp://%s:%d/%s", wantRTMPHost, wantRTMPPort, wantStreamKey)
|
||||
var rtmpConfig config.RTMPSource
|
||||
var proto string
|
||||
var defaultPort int
|
||||
if mediaServerConfig.RTMP.Enabled {
|
||||
rtmpConfig = mediaServerConfig.RTMP
|
||||
proto = "rtmp://"
|
||||
defaultPort = 1935
|
||||
} else {
|
||||
rtmpConfig = mediaServerConfig.RTMPS
|
||||
proto = "rtmps://"
|
||||
defaultPort = 1936
|
||||
}
|
||||
|
||||
wantHost := cmp.Or(mediaServerConfig.Host, "localhost")
|
||||
wantRTMPPort := cmp.Or(rtmpConfig.Port, defaultPort)
|
||||
wantStreamKey := cmp.Or(mediaServerConfig.StreamKey, "live")
|
||||
wantRTMPURL := fmt.Sprintf("%s%s:%d/%s", proto, wantHost, wantRTMPPort, wantStreamKey)
|
||||
|
||||
destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
||||
ContainerRequest: testcontainers.ContainerRequest{
|
||||
Image: "bluenviron/mediamtx:latest",
|
||||
Env: map[string]string{"MTX_RTMPADDRESS": ":1936"},
|
||||
ExposedPorts: []string{"1936/tcp"},
|
||||
WaitingFor: wait.ForListeningPort("1936/tcp"),
|
||||
ExposedPorts: []string{"1935/tcp"},
|
||||
WaitingFor: wait.ForListeningPort("1935/tcp"),
|
||||
},
|
||||
Started: true,
|
||||
})
|
||||
testcontainers.CleanupContainer(t, destServer)
|
||||
require.NoError(t, err)
|
||||
destServerPort, err := destServer.MappedPort(ctx, "1936/tcp")
|
||||
destServerPort, err := destServer.MappedPort(ctx, "1935/tcp")
|
||||
require.NoError(t, err)
|
||||
|
||||
logger := testhelpers.NewTestLogger(t).With("component", "integration")
|
||||
@ -74,17 +114,10 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int,
|
||||
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
destURL1 := fmt.Sprintf("rtmp://%s:%d/%s/dest1", hostIP, destServerPort.Int(), wantStreamKey)
|
||||
destURL2 := fmt.Sprintf("rtmp://%s:%d/%s/dest2", hostIP, destServerPort.Int(), wantStreamKey)
|
||||
destURL1 := fmt.Sprintf("rtmp://%s:%d/live/dest1", hostIP, destServerPort.Int())
|
||||
destURL2 := fmt.Sprintf("rtmp://%s:%d/live/dest2", hostIP, destServerPort.Int())
|
||||
configService := setupConfigService(t, config.Config{
|
||||
Sources: config.Sources{
|
||||
MediaServer: config.MediaServerSource{
|
||||
Host: rtmpHost,
|
||||
StreamKey: streamKey,
|
||||
RTMP: &config.RTMPSource{
|
||||
NetAddr: config.NetAddr{IP: rtmpIP, Port: rtmpPort},
|
||||
}},
|
||||
},
|
||||
Sources: config.Sources{MediaServer: mediaServerConfig},
|
||||
// Load one destination from config, add the other in-app.
|
||||
Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}},
|
||||
})
|
||||
@ -113,13 +146,12 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int,
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
||||
|
||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||
},
|
||||
2*time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected the mediaserver to start",
|
||||
)
|
||||
@ -130,16 +162,14 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int,
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
||||
|
||||
assert.Contains(t, contents[1], "URL "+wantRTMPURL, "expected mediaserver status to be receiving")
|
||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
||||
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
||||
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||
assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||
assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy")
|
||||
},
|
||||
time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to receive an ingress stream",
|
||||
)
|
||||
@ -159,22 +189,21 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int,
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
||||
|
||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
||||
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
||||
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||
assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||
assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy")
|
||||
|
||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(t, contents[2], "off-air", "expected local server 1 to be off-air")
|
||||
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(c, contents[3], "off-air", "expected local server 0 to be off-air")
|
||||
|
||||
require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present")
|
||||
assert.Contains(t, contents[3], "off-air", "expected local server 2 to be off-air")
|
||||
require.Contains(c, contents[3], "Local server 2", "expected local server 2 to be present")
|
||||
assert.Contains(c, contents[3], "off-air", "expected local server 2 to be off-air")
|
||||
|
||||
},
|
||||
2*time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to add the destinations",
|
||||
)
|
||||
@ -187,23 +216,22 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int,
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
||||
|
||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
||||
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
||||
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||
assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||
assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy")
|
||||
|
||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
|
||||
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending")
|
||||
assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||
|
||||
require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present")
|
||||
assert.Contains(t, contents[3], "sending", "expected local server 2 to be sending")
|
||||
assert.Contains(t, contents[3], "healthy", "expected local server 2 to be healthy")
|
||||
require.Contains(c, contents[3], "Local server 2", "expected local server 2 to be present")
|
||||
assert.Contains(c, contents[3], "sending", "expected local server 2 to be sending")
|
||||
assert.Contains(c, contents[3], "healthy", "expected local server 2 to be healthy")
|
||||
},
|
||||
2*time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to start the destination streams",
|
||||
)
|
||||
@ -215,22 +243,21 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int,
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
||||
|
||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
||||
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
||||
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||
assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||
assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy")
|
||||
|
||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
|
||||
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending")
|
||||
assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||
|
||||
require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present")
|
||||
require.NotContains(c, contents[3], "Local server 2", "expected local server 2 to not be present")
|
||||
|
||||
},
|
||||
2*time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to remove the second destination",
|
||||
)
|
||||
@ -241,16 +268,15 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int,
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
||||
|
||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(t, contents[2], "exited", "expected local server 1 to have exited")
|
||||
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(c, contents[2], "exited", "expected local server 1 to have exited")
|
||||
|
||||
require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present")
|
||||
require.NotContains(c, contents[3], "Local server 2", "expected local server 2 to not be present")
|
||||
},
|
||||
time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to stop the first destination stream",
|
||||
)
|
||||
@ -278,7 +304,7 @@ func TestIntegrationCustomRTMPURL(t *testing.T) {
|
||||
Sources: config.Sources{
|
||||
MediaServer: config.MediaServerSource{
|
||||
Host: "rtmp.live.tv",
|
||||
RTMP: &config.RTMPSource{},
|
||||
RTMP: config.RTMPSource{Enabled: true},
|
||||
},
|
||||
},
|
||||
})
|
||||
@ -293,12 +319,15 @@ func TestIntegrationCustomRTMPURL(t *testing.T) {
|
||||
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||
}()
|
||||
|
||||
time.Sleep(time.Second)
|
||||
sendKey(t, screen, tcell.KeyF1, ' ')
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
assert.True(t, contentsIncludes(getContents(), "URL rtmp://rtmp.live.tv:1935/live"), "expected to see custom host name")
|
||||
assert.True(t, contentsIncludes(getContents(), "rtmp://rtmp.live.tv:1935/live"), "expected to see custom host name")
|
||||
},
|
||||
5*time.Second,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to see custom host name",
|
||||
)
|
||||
@ -308,6 +337,7 @@ func TestIntegrationCustomRTMPURL(t *testing.T) {
|
||||
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestIntegrationRestartDestination(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||
defer cancel()
|
||||
@ -337,7 +367,7 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
configService := setupConfigService(t, config.Config{
|
||||
Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: &config.RTMPSource{}}},
|
||||
Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}},
|
||||
Destinations: []config.Destination{{
|
||||
Name: "Local server 1",
|
||||
URL: fmt.Sprintf("rtmp://%s:%d/live", hostIP, destServerRTMPPort.Int()),
|
||||
@ -355,13 +385,12 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
||||
|
||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||
},
|
||||
2*time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected the mediaserver to start",
|
||||
)
|
||||
@ -372,13 +401,12 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 3, "expected at least 3 lines of output")
|
||||
|
||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
||||
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||
},
|
||||
time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to receive an ingress stream",
|
||||
)
|
||||
@ -389,17 +417,16 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
||||
|
||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
||||
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||
|
||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
|
||||
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending")
|
||||
assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||
},
|
||||
2*time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to start the destination stream",
|
||||
)
|
||||
@ -412,17 +439,16 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 3, "expected at least 3 lines of output")
|
||||
|
||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
||||
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||
|
||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(t, contents[2], "off-air", "expected local server 1 to be off-air")
|
||||
assert.Contains(t, contents[2], "restarting", "expected local server 1 to be restarting")
|
||||
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(c, contents[2], "off-air", "expected local server 1 to be off-air")
|
||||
assert.Contains(c, contents[2], "restarting", "expected local server 1 to be restarting")
|
||||
},
|
||||
20*time.Second,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to begin restarting",
|
||||
)
|
||||
@ -430,17 +456,16 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 4, "expected at least 4 lines of output")
|
||||
|
||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
||||
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||
|
||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
|
||||
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending")
|
||||
assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||
},
|
||||
2*time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to restart the destination stream",
|
||||
)
|
||||
@ -451,16 +476,15 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 4, "expected at least 4 lines of output")
|
||||
|
||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(t, contents[2], "exited", "expected local server 1 to have exited")
|
||||
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||
assert.Contains(c, contents[2], "exited", "expected local server 1 to have exited")
|
||||
|
||||
require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present")
|
||||
require.NotContains(c, contents[3], "Local server 2", "expected local server 2 to not be present")
|
||||
},
|
||||
time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to stop the destination stream",
|
||||
)
|
||||
@ -483,7 +507,7 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
configService := setupConfigService(t, config.Config{
|
||||
Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: &config.RTMPSource{}}},
|
||||
Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}},
|
||||
Destinations: []config.Destination{{Name: "Example server", URL: "rtmp://rtmp.example.com/live"}},
|
||||
})
|
||||
|
||||
@ -498,13 +522,12 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
||||
|
||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||
},
|
||||
2*time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected the mediaserver to start",
|
||||
)
|
||||
@ -515,13 +538,12 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 3, "expected at least 3 lines of output")
|
||||
|
||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
||||
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||
},
|
||||
time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to receive an ingress stream",
|
||||
)
|
||||
@ -532,12 +554,12 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
assert.True(t, contentsIncludes(contents, "Streaming to Example server failed:"), "expected to see destination error")
|
||||
assert.True(t, contentsIncludes(contents, "Error opening output files: I/O error"), "expected to see destination error")
|
||||
assert.True(c, contentsIncludes(contents, "Streaming to Example server failed:"), "expected to see destination error")
|
||||
assert.True(c, contentsIncludes(contents, "Error opening output files: I/O error"), "expected to see destination error")
|
||||
},
|
||||
time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to see the destination start error modal",
|
||||
)
|
||||
@ -559,7 +581,7 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
configService := setupConfigService(t, config.Config{
|
||||
Sources: config.Sources{MediaServer: config.MediaServerSource{StreamKey: "live", RTMP: &config.RTMPSource{}}},
|
||||
Sources: config.Sources{MediaServer: config.MediaServerSource{StreamKey: "live", RTMP: config.RTMPSource{Enabled: true}}},
|
||||
})
|
||||
|
||||
done := make(chan struct{})
|
||||
@ -573,14 +595,14 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
||||
require.True(c, len(contents) > 2, "expected at least 3 lines of output")
|
||||
|
||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||
assert.True(t, contentsIncludes(contents, "No destinations added yet. Press [a] to add a new destination."), "expected to see no destinations message")
|
||||
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||
assert.True(c, contentsIncludes(contents, "No destinations added yet. Press [a] to add a new destination."), "expected to see no destinations message")
|
||||
},
|
||||
2*time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected the mediaserver to start",
|
||||
)
|
||||
@ -594,13 +616,13 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
|
||||
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
||||
assert.True(t, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error")
|
||||
assert.True(c, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
||||
assert.True(c, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error")
|
||||
},
|
||||
10*time.Second,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected a validation error for an empty URL",
|
||||
)
|
||||
@ -614,13 +636,13 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
|
||||
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
||||
assert.True(t, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error")
|
||||
assert.True(c, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
||||
assert.True(c, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error")
|
||||
},
|
||||
10*time.Second,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected a validation error for an invalid URL",
|
||||
)
|
||||
@ -635,15 +657,14 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
||||
|
||||
require.Contains(t, contents[2], "My stream", "expected new destination to be present")
|
||||
assert.Contains(t, contents[2], "off-air", "expected new destination to be off-air")
|
||||
assert.False(t, contentsIncludes(contents, "No destinations added yet. Press [a] to add a new destination."), "expected to not see no destinations message")
|
||||
require.Contains(c, contents[2], "My stream", "expected new destination to be present")
|
||||
assert.Contains(c, contents[2], "off-air", "expected new destination to be off-air")
|
||||
assert.False(c, contentsIncludes(contents, "No destinations added yet"), "expected to not see no destinations message")
|
||||
},
|
||||
10*time.Second,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to add the destination",
|
||||
)
|
||||
@ -661,13 +682,13 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
|
||||
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
||||
assert.True(t, contentsIncludes(contents, "validate: duplicate destination URL: rtmp://"), "expected to see config update error")
|
||||
assert.True(c, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
||||
assert.True(c, contentsIncludes(contents, "validate: duplicate destination URL: rtmp://"), "expected to see config update error")
|
||||
},
|
||||
10*time.Second,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected a validation error for a duplicate URL",
|
||||
)
|
||||
@ -702,7 +723,7 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
||||
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
||||
require.NoError(t, err)
|
||||
|
||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: &config.RTMPSource{}}}})
|
||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
done := make(chan struct{})
|
||||
@ -716,10 +737,10 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
assert.True(t, contentsIncludes(getContents(), "Another instance of Octoplex may already be running."), "expected to see startup check modal")
|
||||
func(c *assert.CollectT) {
|
||||
assert.True(c, contentsIncludes(getContents(), "Another instance of Octoplex may already be running."), "expected to see startup check modal")
|
||||
},
|
||||
30*time.Second,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to see startup check modal",
|
||||
)
|
||||
@ -729,13 +750,13 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
_, err := staleContainer.State(context.Background())
|
||||
// IsRunning() does not work, probably because we're undercutting the
|
||||
// testcontainers API.
|
||||
require.True(t, errdefs.IsNotFound(err), "expected to not find the container")
|
||||
require.True(c, errdefs.IsNotFound(err), "expected to not find the container")
|
||||
},
|
||||
time.Minute,
|
||||
waitTime,
|
||||
2*time.Second,
|
||||
"expected to quit the other containers",
|
||||
)
|
||||
@ -743,13 +764,13 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
func(c *assert.CollectT) {
|
||||
contents := getContents()
|
||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
||||
require.True(c, len(contents) > 2, "expected at least 3 lines of output")
|
||||
|
||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||
},
|
||||
10*time.Second,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected the mediaserver to start",
|
||||
)
|
||||
@ -771,7 +792,7 @@ func TestIntegrationMediaServerError(t *testing.T) {
|
||||
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
||||
require.NoError(t, err)
|
||||
|
||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: &config.RTMPSource{}}}})
|
||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
done := make(chan struct{})
|
||||
@ -785,11 +806,11 @@ func TestIntegrationMediaServerError(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
assert.True(t, contentsIncludes(getContents(), "Mediaserver error: Server process exited unexpectedly."), "expected to see title")
|
||||
assert.True(t, contentsIncludes(getContents(), "address already in use"), "expected to see message")
|
||||
func(c *assert.CollectT) {
|
||||
assert.True(c, contentsIncludes(getContents(), "Mediaserver error: Server process exited unexpectedly."), "expected to see title")
|
||||
assert.True(c, contentsIncludes(getContents(), "address already in use"), "expected to see message")
|
||||
},
|
||||
time.Minute,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to see media server error modal",
|
||||
)
|
||||
@ -810,7 +831,7 @@ func TestIntegrationDockerClientError(t *testing.T) {
|
||||
var dockerClient mocks.DockerClient
|
||||
dockerClient.EXPECT().NetworkCreate(mock.Anything, mock.Anything, mock.Anything).Return(network.CreateResponse{}, errors.New("boom"))
|
||||
|
||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: &config.RTMPSource{}}}})
|
||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
done := make(chan struct{})
|
||||
@ -828,11 +849,11 @@ func TestIntegrationDockerClientError(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
assert.True(t, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message")
|
||||
assert.True(t, contentsIncludes(getContents(), "create container client: network create: boom"), "expected to see message")
|
||||
func(c *assert.CollectT) {
|
||||
assert.True(c, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message")
|
||||
assert.True(c, contentsIncludes(getContents(), "create container client: network create: boom"), "expected to see message")
|
||||
},
|
||||
5*time.Second,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to see fatal error modal",
|
||||
)
|
||||
@ -843,6 +864,7 @@ func TestIntegrationDockerClientError(t *testing.T) {
|
||||
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestIntegrationDockerConnectionError(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||
defer cancel()
|
||||
@ -851,7 +873,7 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
|
||||
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.WithHost("http://docker.example.com"))
|
||||
require.NoError(t, err)
|
||||
|
||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: &config.RTMPSource{}}}})
|
||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
done := make(chan struct{})
|
||||
@ -867,11 +889,11 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
func(t *assert.CollectT) {
|
||||
assert.True(t, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message")
|
||||
assert.True(t, contentsIncludes(getContents(), "Could not connect to Docker. Is Docker installed"), "expected to see message")
|
||||
func(c *assert.CollectT) {
|
||||
assert.True(c, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message")
|
||||
assert.True(c, contentsIncludes(getContents(), "Could not connect to Docker. Is Docker installed"), "expected to see message")
|
||||
},
|
||||
5*time.Second,
|
||||
waitTime,
|
||||
time.Second,
|
||||
"expected to see fatal error modal",
|
||||
)
|
||||
@ -882,3 +904,103 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
|
||||
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestIntegrationCopyURLs(t *testing.T) {
|
||||
type binding struct {
|
||||
key tcell.Key
|
||||
content string
|
||||
url string
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
mediaServerConfig config.MediaServerSource
|
||||
wantBindings []binding
|
||||
wantNot []string
|
||||
}{
|
||||
{
|
||||
name: "RTMP only",
|
||||
mediaServerConfig: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}},
|
||||
wantBindings: []binding{
|
||||
{
|
||||
key: tcell.KeyF1,
|
||||
content: "F1 Copy source RTMP URL",
|
||||
url: "rtmp://localhost:1935/live",
|
||||
},
|
||||
},
|
||||
wantNot: []string{"F2"},
|
||||
},
|
||||
{
|
||||
name: "RTMPS only",
|
||||
mediaServerConfig: config.MediaServerSource{RTMPS: config.RTMPSource{Enabled: true}},
|
||||
wantBindings: []binding{
|
||||
{
|
||||
key: tcell.KeyF1,
|
||||
content: "F1 Copy source RTMPS URL",
|
||||
url: "rtmps://localhost:1936/live",
|
||||
},
|
||||
},
|
||||
wantNot: []string{"F2"},
|
||||
},
|
||||
{
|
||||
name: "RTMP and RTMPS",
|
||||
mediaServerConfig: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}, RTMPS: config.RTMPSource{Enabled: true}},
|
||||
wantBindings: []binding{
|
||||
{
|
||||
key: tcell.KeyF1,
|
||||
content: "F1 Copy source RTMP URL",
|
||||
url: "rtmp://localhost:1935/live",
|
||||
},
|
||||
{
|
||||
key: tcell.KeyF2,
|
||||
content: "F2 Copy source RTMPS URL",
|
||||
url: "rtmps://localhost:1936/live",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
|
||||
defer cancel()
|
||||
|
||||
logger := testhelpers.NewTestLogger(t).With("component", "integration")
|
||||
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
||||
require.NoError(t, err)
|
||||
|
||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: tc.mediaServerConfig}})
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||
}()
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
printScreen(t, getContents, "Ater loading the app")
|
||||
|
||||
for _, want := range tc.wantBindings {
|
||||
assert.True(t, contentsIncludes(getContents(), want.content), "expected to see %q", want)
|
||||
|
||||
sendKey(t, screen, want.key, ' ')
|
||||
time.Sleep(3 * time.Second)
|
||||
assert.True(t, contentsIncludes(getContents(), want.url), "expected to see copied message")
|
||||
|
||||
sendKey(t, screen, tcell.KeyEscape, ' ')
|
||||
}
|
||||
|
||||
for _, wantNot := range tc.wantNot {
|
||||
assert.False(t, contentsIncludes(getContents(), wantNot), "expected to not see %q", wantNot)
|
||||
}
|
||||
|
||||
cancel()
|
||||
|
||||
<-done
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -30,14 +30,17 @@ type NetAddr struct {
|
||||
|
||||
// RTMPSource holds the configuration for the RTMP source.
|
||||
type RTMPSource struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
|
||||
NetAddr `yaml:",inline"`
|
||||
}
|
||||
|
||||
// MediaServerSource holds the configuration for the media server source.
|
||||
type MediaServerSource struct {
|
||||
StreamKey string `yaml:"streamKey,omitempty"`
|
||||
Host string `yaml:"host,omitempty"`
|
||||
RTMP *RTMPSource `yaml:"rtmp,omitempty"`
|
||||
StreamKey string `yaml:"streamKey,omitempty"`
|
||||
Host string `yaml:"host,omitempty"`
|
||||
RTMP RTMPSource `yaml:"rtmp,omitempty"`
|
||||
RTMPS RTMPSource `yaml:"rtmps,omitempty"`
|
||||
}
|
||||
|
||||
// Sources holds the configuration for the sources.
|
||||
|
@ -180,10 +180,13 @@ func (s *Service) writeConfig(cfgBytes []byte) error {
|
||||
// populateConfigOnBuild is called to set default values for a new, empty
|
||||
// configuration.
|
||||
//
|
||||
// This function may set exported fields to arbitrary values.
|
||||
// This function may set serialized fields to arbitrary values.
|
||||
func (s *Service) populateConfigOnBuild(cfg *Config) {
|
||||
cfg.Sources.MediaServer.StreamKey = "live"
|
||||
cfg.Sources.MediaServer.RTMP = &RTMPSource{NetAddr{"127.0.0.1", 1935}}
|
||||
cfg.Sources.MediaServer.RTMP = RTMPSource{
|
||||
Enabled: true,
|
||||
NetAddr: NetAddr{IP: "127.0.0.1", Port: 1935},
|
||||
}
|
||||
|
||||
s.populateConfigOnRead(cfg)
|
||||
}
|
||||
@ -191,7 +194,7 @@ func (s *Service) populateConfigOnBuild(cfg *Config) {
|
||||
// populateConfigOnRead is called to set default values for a configuration
|
||||
// read from an existing file.
|
||||
//
|
||||
// This function should not update any exported values, which would be a
|
||||
// This function should not update any serialized values, which would be a
|
||||
// confusing experience for the user.
|
||||
func (s *Service) populateConfigOnRead(cfg *Config) {
|
||||
cfg.LogFile.defaultPath = filepath.Join(s.appStateDir, "octoplex.log")
|
||||
|
@ -19,6 +19,9 @@ import (
|
||||
//go:embed testdata/complete.yml
|
||||
var configComplete []byte
|
||||
|
||||
//go:embed testdata/rtmps-only.yml
|
||||
var configRTMPSOnly []byte
|
||||
|
||||
//go:embed testdata/logfile.yml
|
||||
var configLogfile []byte
|
||||
|
||||
@ -97,12 +100,20 @@ func TestConfigServiceReadConfig(t *testing.T) {
|
||||
MediaServer: config.MediaServerSource{
|
||||
StreamKey: "s3cr3t",
|
||||
Host: "rtmp.example.com",
|
||||
RTMP: &config.RTMPSource{
|
||||
RTMP: config.RTMPSource{
|
||||
Enabled: true,
|
||||
NetAddr: config.NetAddr{
|
||||
IP: "0.0.0.0",
|
||||
Port: 19350,
|
||||
},
|
||||
},
|
||||
RTMPS: config.RTMPSource{
|
||||
Enabled: true,
|
||||
NetAddr: config.NetAddr{
|
||||
IP: "0.0.0.0",
|
||||
Port: 19443,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Destinations: []config.Destination{
|
||||
@ -118,6 +129,33 @@ func TestConfigServiceReadConfig(t *testing.T) {
|
||||
)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "RTMPS only",
|
||||
configBytes: configRTMPSOnly,
|
||||
want: func(t *testing.T, cfg config.Config) {
|
||||
require.Empty(
|
||||
t,
|
||||
gocmp.Diff(
|
||||
config.Config{
|
||||
LogFile: config.LogFile{Enabled: true},
|
||||
Sources: config.Sources{
|
||||
MediaServer: config.MediaServerSource{
|
||||
RTMPS: config.RTMPSource{
|
||||
Enabled: true,
|
||||
NetAddr: config.NetAddr{
|
||||
IP: "0.0.0.0",
|
||||
Port: 1935,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
cfg,
|
||||
cmpopts.IgnoreUnexported(config.LogFile{}),
|
||||
),
|
||||
)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "logging enabled, logfile",
|
||||
configBytes: configLogfile,
|
||||
|
7
internal/config/testdata/complete.yml
vendored
7
internal/config/testdata/complete.yml
vendored
@ -7,8 +7,13 @@ sources:
|
||||
streamKey: s3cr3t
|
||||
host: rtmp.example.com
|
||||
rtmp:
|
||||
enabled: true
|
||||
ip: 0.0.0.0
|
||||
port: 19350
|
||||
port: 19350
|
||||
rtmps:
|
||||
enabled: true
|
||||
ip: 0.0.0.0
|
||||
port: 19443
|
||||
destinations:
|
||||
- name: my stream
|
||||
url: rtmp://rtmp.example.com:1935/live
|
||||
|
9
internal/config/testdata/rtmps-only.yml
vendored
Normal file
9
internal/config/testdata/rtmps-only.yml
vendored
Normal file
@ -0,0 +1,9 @@
|
||||
---
|
||||
logfile:
|
||||
enabled: true
|
||||
sources:
|
||||
mediaServer:
|
||||
rtmps:
|
||||
enabled: true
|
||||
ip: 0.0.0.0
|
||||
port: 1935
|
@ -35,7 +35,6 @@ type Source struct {
|
||||
Live bool
|
||||
LiveChangedAt time.Time
|
||||
Tracks []string
|
||||
RTMPURL string
|
||||
ExitReason string
|
||||
}
|
||||
|
||||
@ -62,6 +61,11 @@ type NetAddr struct {
|
||||
Port int
|
||||
}
|
||||
|
||||
// IsZero returns true if the NetAddr is zero value.
|
||||
func (n NetAddr) IsZero() bool {
|
||||
return n.IP == "" && n.Port == 0
|
||||
}
|
||||
|
||||
// Container status strings.
|
||||
//
|
||||
// TODO: refactor to strictly reflect Docker status strings.
|
||||
|
@ -31,3 +31,12 @@ func TestAppStateClone(t *testing.T) {
|
||||
s.Destinations[0].Name = "Twitch"
|
||||
assert.Equal(t, "YouTube", s2.Destinations[0].Name)
|
||||
}
|
||||
|
||||
func TestNetAddr(t *testing.T) {
|
||||
var addr domain.NetAddr
|
||||
assert.True(t, addr.IsZero())
|
||||
|
||||
addr.IP = "127.0.0.1"
|
||||
addr.Port = 3000
|
||||
assert.False(t, addr.IsZero())
|
||||
}
|
||||
|
@ -30,7 +30,8 @@ const (
|
||||
defaultAPIPort = 9997 // default API host port for the media server
|
||||
defaultRTMPIP = "127.0.0.1" // default RTMP host IP, bound to localhost for security
|
||||
defaultRTMPPort = 1935 // default RTMP host port for the media server
|
||||
defaultRTMPHost = "localhost" // default RTMP host name, used for the RTMP URL
|
||||
defaultRTMPSPort = 1936 // default RTMPS host port for the media server
|
||||
defaultHost = "localhost" // default mediaserver host name
|
||||
defaultChanSize = 64 // default channel size for asynchronous non-error channels
|
||||
imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx
|
||||
defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey].
|
||||
@ -47,9 +48,10 @@ type Actor struct {
|
||||
stateC chan domain.Source
|
||||
chanSize int
|
||||
containerClient *container.Client
|
||||
apiPort int
|
||||
rtmpAddr domain.NetAddr
|
||||
rtmpHost string
|
||||
rtmpsAddr domain.NetAddr
|
||||
apiPort int
|
||||
host string
|
||||
streamKey StreamKey
|
||||
updateStateInterval time.Duration
|
||||
pass string // password for the media server
|
||||
@ -64,16 +66,25 @@ type Actor struct {
|
||||
// NewActorParams contains the parameters for building a new media server
|
||||
// actor.
|
||||
type NewActorParams struct {
|
||||
APIPort int // defaults to 9997
|
||||
RTMPAddr domain.NetAddr // defaults to 127.0.0.1:1935
|
||||
Host string // defaults to "localhost"
|
||||
StreamKey StreamKey // defaults to "live"
|
||||
ChanSize int // defaults to 64
|
||||
UpdateStateInterval time.Duration // defaults to 5 seconds
|
||||
RTMPAddr OptionalNetAddr // defaults to disabled, or 127.0.0.1:1935
|
||||
RTMPSAddr OptionalNetAddr // defaults to disabled, or 127.0.0.1:1936
|
||||
APIPort int // defaults to 9997
|
||||
Host string // defaults to "localhost"
|
||||
StreamKey StreamKey // defaults to "live"
|
||||
ChanSize int // defaults to 64
|
||||
UpdateStateInterval time.Duration // defaults to 5 seconds
|
||||
ContainerClient *container.Client
|
||||
Logger *slog.Logger
|
||||
}
|
||||
|
||||
// OptionalNetAddr is a wrapper around domain.NetAddr that indicates whether it
|
||||
// is enabled or not.
|
||||
type OptionalNetAddr struct {
|
||||
domain.NetAddr
|
||||
|
||||
Enabled bool
|
||||
}
|
||||
|
||||
// NewActor creates a new media server actor.
|
||||
//
|
||||
// Callers must consume the state channel exposed via [C].
|
||||
@ -87,15 +98,12 @@ func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error)
|
||||
return nil, fmt.Errorf("build API client: %w", err)
|
||||
}
|
||||
|
||||
rtmpAddr := params.RTMPAddr
|
||||
rtmpAddr.IP = cmp.Or(rtmpAddr.IP, defaultRTMPIP)
|
||||
rtmpAddr.Port = cmp.Or(rtmpAddr.Port, defaultRTMPPort)
|
||||
|
||||
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
|
||||
return &Actor{
|
||||
rtmpAddr: toRTMPAddr(params.RTMPAddr, defaultRTMPPort),
|
||||
rtmpsAddr: toRTMPAddr(params.RTMPSAddr, defaultRTMPSPort),
|
||||
apiPort: cmp.Or(params.APIPort, defaultAPIPort),
|
||||
rtmpAddr: rtmpAddr,
|
||||
rtmpHost: cmp.Or(params.Host, defaultRTMPHost),
|
||||
host: cmp.Or(params.Host, defaultHost),
|
||||
streamKey: cmp.Or(params.StreamKey, defaultStreamKey),
|
||||
updateStateInterval: cmp.Or(params.UpdateStateInterval, defaultUpdateStateInterval),
|
||||
tlsCert: tlsCert,
|
||||
@ -112,56 +120,37 @@ func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error)
|
||||
}
|
||||
|
||||
func (a *Actor) Start(ctx context.Context) error {
|
||||
apiPortSpec := nat.Port(fmt.Sprintf("127.0.0.1:%d:9997", a.apiPort))
|
||||
rtmpPortSpec := nat.Port(fmt.Sprintf("%s:%d:%d", a.rtmpAddr.IP, a.rtmpAddr.Port, 1935))
|
||||
exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)})
|
||||
|
||||
// The RTMP URL is passed to the UI via the state.
|
||||
// This could be refactored, it's not really stateful data.
|
||||
a.state.RTMPURL = a.RTMPURL()
|
||||
|
||||
cfg, err := yaml.Marshal(
|
||||
Config{
|
||||
LogLevel: "info",
|
||||
LogDestinations: []string{"stdout"},
|
||||
AuthMethod: "internal",
|
||||
AuthInternalUsers: []User{
|
||||
{
|
||||
User: "any",
|
||||
IPs: []string{}, // any IP
|
||||
Permissions: []UserPermission{
|
||||
{Action: "publish"},
|
||||
},
|
||||
},
|
||||
{
|
||||
User: "api",
|
||||
Pass: a.pass,
|
||||
IPs: []string{}, // any IP
|
||||
Permissions: []UserPermission{
|
||||
{Action: "read"},
|
||||
},
|
||||
},
|
||||
{
|
||||
User: "api",
|
||||
Pass: a.pass,
|
||||
IPs: []string{}, // any IP
|
||||
Permissions: []UserPermission{{Action: "api"}},
|
||||
},
|
||||
},
|
||||
API: true,
|
||||
APIEncryption: true,
|
||||
APIServerCert: "/etc/tls.crt",
|
||||
APIServerKey: "/etc/tls.key",
|
||||
Paths: map[string]Path{
|
||||
string(a.streamKey): {Source: "publisher"},
|
||||
},
|
||||
},
|
||||
)
|
||||
if err != nil { // should never happen
|
||||
return fmt.Errorf("marshal config: %w", err)
|
||||
var portSpecs []string
|
||||
portSpecs = append(portSpecs, fmt.Sprintf("127.0.0.1:%d:9997", a.apiPort))
|
||||
if !a.rtmpAddr.IsZero() {
|
||||
portSpecs = append(portSpecs, fmt.Sprintf("%s:%d:%d", a.rtmpAddr.IP, a.rtmpAddr.Port, 1935))
|
||||
}
|
||||
if !a.rtmpsAddr.IsZero() {
|
||||
portSpecs = append(portSpecs, fmt.Sprintf("%s:%d:%d", a.rtmpsAddr.IP, a.rtmpsAddr.Port, 1936))
|
||||
}
|
||||
exposedPorts, portBindings, err := nat.ParsePortSpecs(portSpecs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse port specs: %w", err)
|
||||
}
|
||||
|
||||
a.logger.Info("Starting media server", "host", a.rtmpHost, "bind_ip", a.rtmpAddr.IP, "bind_port", a.rtmpAddr.Port)
|
||||
cfg, err := a.buildServerConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("build server config: %w", err)
|
||||
}
|
||||
|
||||
args := []any{"host", a.host}
|
||||
if a.rtmpAddr.IsZero() {
|
||||
args = append(args, "rtmp.enabled", false)
|
||||
} else {
|
||||
args = append(args, "rtmp.enabled", true, "rtmp.bind_addr", a.rtmpAddr.IP, "rtmp.bind_port", a.rtmpAddr.Port)
|
||||
}
|
||||
if a.rtmpsAddr.IsZero() {
|
||||
args = append(args, "rtmps.enabled", false)
|
||||
} else {
|
||||
args = append(args, "rtmps.enabled", true, "rtmps.bind_addr", a.rtmpsAddr.IP, "rtmps.bind_port", a.rtmpsAddr.Port)
|
||||
}
|
||||
a.logger.Info("Starting media server", args...)
|
||||
|
||||
containerStateC, errC := a.containerClient.RunContainer(
|
||||
ctx,
|
||||
container.RunContainerParams{
|
||||
@ -224,6 +213,62 @@ func (a *Actor) Start(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Actor) buildServerConfig() ([]byte, error) {
|
||||
// NOTE: Regardless of the user configuration (which mostly affects exposed
|
||||
// ports and UI rendering) plain RTMP must be enabled at the container level,
|
||||
// for internal connections.
|
||||
var encryptionString string
|
||||
if a.rtmpsAddr.IsZero() {
|
||||
encryptionString = "no"
|
||||
} else {
|
||||
encryptionString = "optional"
|
||||
}
|
||||
|
||||
return yaml.Marshal(
|
||||
Config{
|
||||
LogLevel: "debug",
|
||||
LogDestinations: []string{"stdout"},
|
||||
AuthMethod: "internal",
|
||||
AuthInternalUsers: []User{
|
||||
{
|
||||
User: "any",
|
||||
IPs: []string{}, // any IP
|
||||
Permissions: []UserPermission{
|
||||
{Action: "publish"},
|
||||
},
|
||||
},
|
||||
{
|
||||
User: "api",
|
||||
Pass: a.pass,
|
||||
IPs: []string{}, // any IP
|
||||
Permissions: []UserPermission{
|
||||
{Action: "read"},
|
||||
},
|
||||
},
|
||||
{
|
||||
User: "api",
|
||||
Pass: a.pass,
|
||||
IPs: []string{}, // any IP
|
||||
Permissions: []UserPermission{{Action: "api"}},
|
||||
},
|
||||
},
|
||||
RTMP: true,
|
||||
RTMPEncryption: encryptionString,
|
||||
RTMPAddress: ":1935",
|
||||
RTMPSAddress: ":1936",
|
||||
RTMPServerCert: "/etc/tls.crt", // TODO: custom certs
|
||||
RTMPServerKey: "/etc/tls.key", // TODO: custom certs
|
||||
API: true,
|
||||
APIEncryption: true,
|
||||
APIServerCert: "/etc/tls.crt",
|
||||
APIServerKey: "/etc/tls.key",
|
||||
Paths: map[string]Path{
|
||||
string(a.streamKey): {Source: "publisher"},
|
||||
},
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// C returns a channel that will receive the current state of the media server.
|
||||
func (s *Actor) C() <-chan domain.Source {
|
||||
return s.stateC
|
||||
@ -329,7 +374,20 @@ func (s *Actor) handleContainerExit(err error) {
|
||||
|
||||
// RTMPURL returns the RTMP URL for the media server, accessible from the host.
|
||||
func (s *Actor) RTMPURL() string {
|
||||
return fmt.Sprintf("rtmp://%s:%d/%s", s.rtmpHost, s.rtmpAddr.Port, s.streamKey)
|
||||
if s.rtmpAddr.IsZero() {
|
||||
return ""
|
||||
}
|
||||
|
||||
return fmt.Sprintf("rtmp://%s:%d/%s", s.host, s.rtmpAddr.Port, s.streamKey)
|
||||
}
|
||||
|
||||
// RTMPSURL returns the RTMPS URL for the media server, accessible from the host.
|
||||
func (s *Actor) RTMPSURL() string {
|
||||
if s.rtmpsAddr.IsZero() {
|
||||
return ""
|
||||
}
|
||||
|
||||
return fmt.Sprintf("rtmps://%s:%d/%s", s.host, s.rtmpsAddr.Port, s.streamKey)
|
||||
}
|
||||
|
||||
// RTMPInternalURL returns the RTMP URL for the media server, accessible from
|
||||
@ -367,3 +425,17 @@ func generatePassword() string {
|
||||
_, _ = rand.Read(p)
|
||||
return fmt.Sprintf("%x", []byte(p))
|
||||
}
|
||||
|
||||
// toRTMPAddr builds a domain.NetAddr from an OptionalNetAddr, with default
|
||||
// values set to RTMP default bind config if needed. If the OptionalNetAddr is
|
||||
// not enabled, a zero value is returned.
|
||||
func toRTMPAddr(a OptionalNetAddr, defaultPort int) domain.NetAddr {
|
||||
if !a.Enabled {
|
||||
return domain.NetAddr{}
|
||||
}
|
||||
|
||||
return domain.NetAddr{
|
||||
IP: cmp.Or(a.IP, defaultRTMPIP),
|
||||
Port: cmp.Or(a.Port, defaultPort),
|
||||
}
|
||||
}
|
||||
|
@ -17,8 +17,12 @@ type Config struct {
|
||||
APIEncryption bool `yaml:"apiEncryption,omitempty"`
|
||||
APIServerCert string `yaml:"apiServerCert,omitempty"`
|
||||
APIServerKey string `yaml:"apiServerKey,omitempty"`
|
||||
RTMP bool `yaml:"rtmp,omitempty"`
|
||||
RTMP bool `yaml:"rtmp"`
|
||||
RTMPEncryption string `yaml:"rtmpEncryption,omitempty"`
|
||||
RTMPAddress string `yaml:"rtmpAddress,omitempty"`
|
||||
RTMPSAddress string `yaml:"rtmpsAddress,omitempty"`
|
||||
RTMPServerCert string `yaml:"rtmpServerCert,omitempty"`
|
||||
RTMPServerKey string `yaml:"rtmpServerKey,omitempty"`
|
||||
HLS bool `yaml:"hls"`
|
||||
RTSP bool `yaml:"rtsp"`
|
||||
WebRTC bool `yaml:"webrtc"`
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
)
|
||||
|
||||
type sourceViews struct {
|
||||
url *tview.TextView
|
||||
status *tview.TextView
|
||||
tracks *tview.TextView
|
||||
health *tview.TextView
|
||||
@ -44,6 +43,7 @@ type UI struct {
|
||||
commandC chan Command
|
||||
clipboardAvailable bool
|
||||
configFilePath string
|
||||
rtmpURL, rtmpsURL string
|
||||
buildInfo domain.BuildInfo
|
||||
logger *slog.Logger
|
||||
|
||||
@ -57,6 +57,7 @@ type UI struct {
|
||||
sourceViews sourceViews
|
||||
destView *tview.Table
|
||||
noDestView *tview.TextView
|
||||
aboutView *tview.Flex
|
||||
pullProgressModal *tview.Modal
|
||||
|
||||
// other mutable state
|
||||
@ -127,8 +128,8 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
||||
sourceView := tview.NewFlex()
|
||||
sourceView.SetDirection(tview.FlexColumn)
|
||||
sourceView.SetBorder(true)
|
||||
sourceView.SetTitle("Source RTMP server")
|
||||
sidebar.AddItem(sourceView, 9, 0, false)
|
||||
sourceView.SetTitle("Source")
|
||||
sidebar.AddItem(sourceView, 8, 0, false)
|
||||
|
||||
leftCol := tview.NewFlex()
|
||||
leftCol.SetDirection(tview.FlexRow)
|
||||
@ -137,11 +138,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
||||
sourceView.AddItem(leftCol, 9, 0, false)
|
||||
sourceView.AddItem(rightCol, 0, 1, false)
|
||||
|
||||
urlHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerURL)
|
||||
leftCol.AddItem(urlHeaderTextView, 1, 0, false)
|
||||
urlTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash)
|
||||
rightCol.AddItem(urlTextView, 1, 0, false)
|
||||
|
||||
statusHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerStatus)
|
||||
leftCol.AddItem(statusHeaderTextView, 1, 0, false)
|
||||
statusTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash)
|
||||
@ -176,13 +172,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
||||
aboutView.SetDirection(tview.FlexRow)
|
||||
aboutView.SetBorder(true)
|
||||
aboutView.SetTitle("Actions")
|
||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]a[-] Add destination"), 1, 0, false)
|
||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Del[-] Remove destination"), 1, 0, false)
|
||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Space[-] Start/stop destination"), 1, 0, false)
|
||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText(""), 1, 0, false)
|
||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]u[-] Copy source RTMP URL"), 1, 0, false)
|
||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false)
|
||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
|
||||
|
||||
sidebar.AddItem(aboutView, 0, 1, false)
|
||||
|
||||
@ -232,7 +221,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
||||
pages: pages,
|
||||
container: container,
|
||||
sourceViews: sourceViews{
|
||||
url: urlTextView,
|
||||
status: statusTextView,
|
||||
tracks: tracksTextView,
|
||||
health: healthTextView,
|
||||
@ -242,6 +230,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
||||
},
|
||||
destView: destView,
|
||||
noDestView: noDestView,
|
||||
aboutView: aboutView,
|
||||
pullProgressModal: pullProgressModal,
|
||||
urlsToStartState: make(map[string]startState),
|
||||
}
|
||||
@ -254,6 +243,30 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
||||
return ui, nil
|
||||
}
|
||||
|
||||
func (ui *UI) renderAboutView() {
|
||||
ui.aboutView.Clear()
|
||||
|
||||
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]a[-] Add destination"), 1, 0, false)
|
||||
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Del[-] Remove destination"), 1, 0, false)
|
||||
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Space[-] Start/stop destination"), 1, 0, false)
|
||||
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText(""), 1, 0, false)
|
||||
|
||||
i := 1
|
||||
if ui.rtmpURL != "" {
|
||||
rtmpURLView := tview.NewTextView().SetDynamicColors(true).SetText(fmt.Sprintf("[grey]F%d[-] Copy source RTMP URL", i))
|
||||
ui.aboutView.AddItem(rtmpURLView, 1, 0, false)
|
||||
i++
|
||||
}
|
||||
|
||||
if ui.rtmpsURL != "" {
|
||||
rtmpsURLView := tview.NewTextView().SetDynamicColors(true).SetText(fmt.Sprintf("[grey]F%d[-] Copy source RTMPS URL", i))
|
||||
ui.aboutView.AddItem(rtmpsURLView, 1, 0, false)
|
||||
}
|
||||
|
||||
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false)
|
||||
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
|
||||
}
|
||||
|
||||
// C returns a channel that receives commands from the user interface.
|
||||
func (ui *UI) C() <-chan Command {
|
||||
return ui.commandC
|
||||
@ -283,6 +296,17 @@ func (ui *UI) run(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// SetRTMPURLs sets the RTMP and RTMPS URLs for the user interface, which are
|
||||
// unavailable when the UI is first created.
|
||||
func (ui *UI) SetRTMPURLs(rtmpURL, rtmpsURL string) {
|
||||
ui.mu.Lock()
|
||||
ui.rtmpURL = rtmpURL
|
||||
ui.rtmpsURL = rtmpsURL
|
||||
ui.mu.Unlock()
|
||||
|
||||
ui.renderAboutView()
|
||||
}
|
||||
|
||||
func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
||||
// Special case: handle CTRL-C even when a modal is visible.
|
||||
if event.Key() == tcell.KeyCtrlC {
|
||||
@ -309,8 +333,6 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
||||
return nil
|
||||
case ' ':
|
||||
ui.toggleDestination()
|
||||
case 'u', 'U':
|
||||
ui.copySourceURLToClipboard(ui.clipboardAvailable)
|
||||
case 'c', 'C':
|
||||
ui.copyConfigFilePathToClipboard(ui.clipboardAvailable, ui.configFilePath)
|
||||
case '?':
|
||||
@ -318,6 +340,8 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
||||
case 'k': // tview vim bindings
|
||||
handleKeyUp()
|
||||
}
|
||||
case tcell.KeyF1, tcell.KeyF2:
|
||||
ui.fkeyHandler(event.Key())
|
||||
case tcell.KeyDelete, tcell.KeyBackspace, tcell.KeyBackspace2:
|
||||
ui.removeDestination()
|
||||
return nil
|
||||
@ -328,11 +352,34 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
||||
return event
|
||||
}
|
||||
|
||||
func (ui *UI) fkeyHandler(key tcell.Key) {
|
||||
var urls []string
|
||||
if ui.rtmpURL != "" {
|
||||
urls = append(urls, ui.rtmpURL)
|
||||
}
|
||||
if ui.rtmpsURL != "" {
|
||||
urls = append(urls, ui.rtmpsURL)
|
||||
}
|
||||
|
||||
switch key {
|
||||
case tcell.KeyF1:
|
||||
if len(urls) == 0 {
|
||||
return
|
||||
}
|
||||
ui.copySourceURLToClipboard(urls[0])
|
||||
case tcell.KeyF2:
|
||||
if len(urls) < 2 {
|
||||
return
|
||||
}
|
||||
ui.copySourceURLToClipboard(urls[1])
|
||||
}
|
||||
}
|
||||
|
||||
func (ui *UI) ShowSourceNotLiveModal() {
|
||||
ui.app.QueueUpdateDraw(func() {
|
||||
ui.showModal(
|
||||
pageNameModalStartupCheck,
|
||||
fmt.Sprintf("Waiting for stream.\nStart streaming to the source URL then try again:\n\n%s", ui.sourceViews.url.GetText(true)),
|
||||
pageNameModalNotLive,
|
||||
"Waiting for stream.\n\nStart streaming to a source URL then try again.",
|
||||
[]string{"Ok"},
|
||||
false,
|
||||
nil,
|
||||
@ -519,6 +566,7 @@ func (ui *UI) updateProgressModal(container domain.Container) {
|
||||
const (
|
||||
pageNameMain = "main"
|
||||
pageNameAddDestination = "add-destination"
|
||||
pageNameViewURLs = "view-urls"
|
||||
pageNameConfigUpdateFailed = "modal-config-update-failed"
|
||||
pageNameNoDestinations = "no-destinations"
|
||||
pageNameModalAbout = "modal-about"
|
||||
@ -530,6 +578,7 @@ const (
|
||||
pageNameModalRemoveDestination = "modal-remove-destination"
|
||||
pageNameModalSourceError = "modal-source-error"
|
||||
pageNameModalStartupCheck = "modal-startup-check"
|
||||
pageNameModalNotLive = "modal-not-live"
|
||||
)
|
||||
|
||||
// modalVisible returns true if any modal, including the add destination form,
|
||||
@ -696,8 +745,6 @@ func (ui *UI) redrawFromState(state domain.AppState) {
|
||||
SetSelectable(false)
|
||||
}
|
||||
|
||||
ui.sourceViews.url.SetText(cmp.Or(state.Source.RTMPURL, dash))
|
||||
|
||||
tracks := dash
|
||||
if state.Source.Live && len(state.Source.Tracks) > 0 {
|
||||
tracks = strings.Join(state.Source.Tracks, ", ")
|
||||
@ -971,13 +1018,12 @@ func (ui *UI) toggleDestination() {
|
||||
}
|
||||
}
|
||||
|
||||
func (ui *UI) copySourceURLToClipboard(clipboardAvailable bool) {
|
||||
func (ui *UI) copySourceURLToClipboard(url string) {
|
||||
var text string
|
||||
|
||||
url := ui.sourceViews.url.GetText(true)
|
||||
if clipboardAvailable {
|
||||
if ui.clipboardAvailable {
|
||||
clipboard.Write(clipboard.FmtText, []byte(url))
|
||||
text = "Source URL copied to clipboard:\n\n" + url
|
||||
text = "URL copied to clipboard:\n\n" + url
|
||||
} else {
|
||||
text = "Copy to clipboard not available:\n\n" + url
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user