Compare commits
2 Commits
60c7fb458b
...
98d93ad286
Author | SHA1 | Date | |
---|---|---|---|
|
98d93ad286 | ||
|
5f026be769 |
16
README.md
16
README.md
@ -97,13 +97,17 @@ logfile:
|
|||||||
enabled: true # defaults to false
|
enabled: true # defaults to false
|
||||||
path: /path/to/logfile # defaults to $XDG_STATE_HOME/octoplex/octoplex.log
|
path: /path/to/logfile # defaults to $XDG_STATE_HOME/octoplex/octoplex.log
|
||||||
sources:
|
sources:
|
||||||
rtmp:
|
mediaServer:
|
||||||
enabled: true # must be true
|
|
||||||
streamKey: live # defaults to "live"
|
streamKey: live # defaults to "live"
|
||||||
host: rtmp.example.com # defaults to "localhost"
|
host: rtmp.example.com # defaults to "localhost"
|
||||||
bindAddr: # optional
|
rtmp:
|
||||||
ip: 0.0.0.0 # defaults to 127.0.0.1
|
enabled: true # defaults to false
|
||||||
|
ip: 127.0.0.1 # defaults to 127.0.0.1
|
||||||
port: 1935 # defaults to 1935
|
port: 1935 # defaults to 1935
|
||||||
|
rtmps:
|
||||||
|
enabled: true # defaults to false
|
||||||
|
ip: 0.0.0.0 # defaults to 127.0.0.1
|
||||||
|
port: 1936 # defaults to 1936
|
||||||
destinations:
|
destinations:
|
||||||
- name: YouTube # Destination name, used only for display
|
- name: YouTube # Destination name, used only for display
|
||||||
url: rtmp://rtmp.youtube.com/12345 # Destination URL with stream key
|
url: rtmp://rtmp.youtube.com/12345 # Destination URL with stream key
|
||||||
@ -115,8 +119,8 @@ destinations:
|
|||||||
:information_source: It is also possible to add and remove destinations directly from the
|
:information_source: It is also possible to add and remove destinations directly from the
|
||||||
terminal user interface.
|
terminal user interface.
|
||||||
|
|
||||||
:warning: `sources.rtmp.bindAddr.ip` must be set to a valid IP address if you want
|
:warning: `sources.mediaServer.rtmp.ip` must be set to a valid IP address if
|
||||||
to accept connections from other hosts. Leave it blank to bind only to
|
you want to accept connections from other hosts. Leave it blank to bind only to
|
||||||
localhost (`127.0.0.1`) or use `0.0.0.0` to bind to all network interfaces.
|
localhost (`127.0.0.1`) or use `0.0.0.0` to bind to all network interfaces.
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
@ -38,9 +38,9 @@ func Run(ctx context.Context, params RunParams) error {
|
|||||||
state := new(domain.AppState)
|
state := new(domain.AppState)
|
||||||
applyConfig(cfg, state)
|
applyConfig(cfg, state)
|
||||||
|
|
||||||
// While RTMP is the only source, it doesn't make sense to disable it.
|
// Ensure there is at least one active source.
|
||||||
if !cfg.Sources.RTMP.Enabled {
|
if !cfg.Sources.MediaServer.RTMP.Enabled && !cfg.Sources.MediaServer.RTMPS.Enabled {
|
||||||
return errors.New("config: sources.rtmp.enabled must be set to true")
|
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
|
||||||
}
|
}
|
||||||
|
|
||||||
logger := params.Logger
|
logger := params.Logger
|
||||||
@ -89,9 +89,10 @@ func Run(ctx context.Context, params RunParams) error {
|
|||||||
updateUI()
|
updateUI()
|
||||||
|
|
||||||
srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{
|
srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{
|
||||||
RTMPAddr: domain.NetAddr(cfg.Sources.RTMP.BindAddr),
|
RTMPAddr: buildNetAddr(cfg.Sources.MediaServer.RTMP),
|
||||||
RTMPHost: cfg.Sources.RTMP.Host,
|
RTMPSAddr: buildNetAddr(cfg.Sources.MediaServer.RTMPS),
|
||||||
StreamKey: mediaserver.StreamKey(cfg.Sources.RTMP.StreamKey),
|
Host: cfg.Sources.MediaServer.Host,
|
||||||
|
StreamKey: mediaserver.StreamKey(cfg.Sources.MediaServer.StreamKey),
|
||||||
ContainerClient: containerClient,
|
ContainerClient: containerClient,
|
||||||
Logger: logger.With("component", "mediaserver"),
|
Logger: logger.With("component", "mediaserver"),
|
||||||
})
|
})
|
||||||
@ -104,6 +105,10 @@ func Run(ctx context.Context, params RunParams) error {
|
|||||||
}
|
}
|
||||||
defer srv.Close()
|
defer srv.Close()
|
||||||
|
|
||||||
|
// Set the RTMP and RTMPS URLs in the UI, which are only known after the
|
||||||
|
// MediaServer is available.
|
||||||
|
ui.SetRTMPURLs(srv.RTMPURL(), srv.RTMPSURL())
|
||||||
|
|
||||||
repl := replicator.StartActor(ctx, replicator.StartActorParams{
|
repl := replicator.StartActor(ctx, replicator.StartActorParams{
|
||||||
SourceURL: srv.RTMPInternalURL(),
|
SourceURL: srv.RTMPInternalURL(),
|
||||||
ContainerClient: containerClient,
|
ContainerClient: containerClient,
|
||||||
@ -321,3 +326,12 @@ func doStartupCheck(ctx context.Context, containerClient *container.Client, show
|
|||||||
|
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// buildNetAddr builds a [mediaserver.OptionalNetAddr] from the config.
|
||||||
|
func buildNetAddr(src config.RTMPSource) mediaserver.OptionalNetAddr {
|
||||||
|
if !src.Enabled {
|
||||||
|
return mediaserver.OptionalNetAddr{Enabled: false}
|
||||||
|
}
|
||||||
|
|
||||||
|
return mediaserver.OptionalNetAddr{Enabled: true, NetAddr: domain.NetAddr(src.NetAddr)}
|
||||||
|
}
|
||||||
|
@ -85,6 +85,8 @@ func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- termina
|
|||||||
lines[y] += string(screenCells[n].Runes[0])
|
lines[y] += string(screenCells[n].Runes[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
require.GreaterOrEqual(t, len(lines), 5, "Screen contents should have at least 5 lines")
|
||||||
|
|
||||||
return lines
|
return lines
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,13 +29,41 @@ import (
|
|||||||
"github.com/testcontainers/testcontainers-go/wait"
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const waitTime = time.Minute
|
||||||
|
|
||||||
func TestIntegration(t *testing.T) {
|
func TestIntegration(t *testing.T) {
|
||||||
t.Run("with default host, port and stream key", func(t *testing.T) {
|
t.Run("RTMP with default host, port and stream key", func(t *testing.T) {
|
||||||
testIntegration(t, "", "", 0, "")
|
testIntegration(t, config.MediaServerSource{
|
||||||
|
RTMP: config.RTMPSource{Enabled: true},
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("with custom host, port and stream key", func(t *testing.T) {
|
t.Run("RTMPS with default host, port and stream key", func(t *testing.T) {
|
||||||
testIntegration(t, "localhost", "0.0.0.0", 3000, "s0meK3y")
|
testIntegration(t, config.MediaServerSource{
|
||||||
|
RTMPS: config.RTMPSource{Enabled: true},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("RTMP with custom host, port and stream key", func(t *testing.T) {
|
||||||
|
testIntegration(t, config.MediaServerSource{
|
||||||
|
StreamKey: "s0meK3y",
|
||||||
|
Host: "localhost",
|
||||||
|
RTMP: config.RTMPSource{
|
||||||
|
Enabled: true,
|
||||||
|
NetAddr: config.NetAddr{IP: "0.0.0.0", Port: 3000},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("RTMPS with custom host, port and stream key", func(t *testing.T) {
|
||||||
|
testIntegration(t, config.MediaServerSource{
|
||||||
|
StreamKey: "an0therK3y",
|
||||||
|
Host: "localhost",
|
||||||
|
RTMPS: config.RTMPSource{
|
||||||
|
Enabled: true,
|
||||||
|
NetAddr: config.NetAddr{IP: "0.0.0.0", Port: 443},
|
||||||
|
},
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -45,27 +73,39 @@ func TestIntegration(t *testing.T) {
|
|||||||
// https://stackoverflow.com/a/60740997/62871
|
// https://stackoverflow.com/a/60740997/62871
|
||||||
const hostIP = "172.17.0.1"
|
const hostIP = "172.17.0.1"
|
||||||
|
|
||||||
func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int, streamKey string) {
|
func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
|
||||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
wantRTMPHost := cmp.Or(rtmpHost, "localhost")
|
var rtmpConfig config.RTMPSource
|
||||||
wantRTMPPort := cmp.Or(rtmpPort, 1935)
|
var proto string
|
||||||
wantStreamKey := cmp.Or(streamKey, "live")
|
var defaultPort int
|
||||||
wantRTMPURL := fmt.Sprintf("rtmp://%s:%d/%s", wantRTMPHost, wantRTMPPort, wantStreamKey)
|
if mediaServerConfig.RTMP.Enabled {
|
||||||
|
rtmpConfig = mediaServerConfig.RTMP
|
||||||
|
proto = "rtmp://"
|
||||||
|
defaultPort = 1935
|
||||||
|
} else {
|
||||||
|
rtmpConfig = mediaServerConfig.RTMPS
|
||||||
|
proto = "rtmps://"
|
||||||
|
defaultPort = 1936
|
||||||
|
}
|
||||||
|
|
||||||
|
wantHost := cmp.Or(mediaServerConfig.Host, "localhost")
|
||||||
|
wantRTMPPort := cmp.Or(rtmpConfig.Port, defaultPort)
|
||||||
|
wantStreamKey := cmp.Or(mediaServerConfig.StreamKey, "live")
|
||||||
|
wantRTMPURL := fmt.Sprintf("%s%s:%d/%s", proto, wantHost, wantRTMPPort, wantStreamKey)
|
||||||
|
|
||||||
destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
||||||
ContainerRequest: testcontainers.ContainerRequest{
|
ContainerRequest: testcontainers.ContainerRequest{
|
||||||
Image: "bluenviron/mediamtx:latest",
|
Image: "bluenviron/mediamtx:latest",
|
||||||
Env: map[string]string{"MTX_RTMPADDRESS": ":1936"},
|
ExposedPorts: []string{"1935/tcp"},
|
||||||
ExposedPorts: []string{"1936/tcp"},
|
WaitingFor: wait.ForListeningPort("1935/tcp"),
|
||||||
WaitingFor: wait.ForListeningPort("1936/tcp"),
|
|
||||||
},
|
},
|
||||||
Started: true,
|
Started: true,
|
||||||
})
|
})
|
||||||
testcontainers.CleanupContainer(t, destServer)
|
testcontainers.CleanupContainer(t, destServer)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
destServerPort, err := destServer.MappedPort(ctx, "1936/tcp")
|
destServerPort, err := destServer.MappedPort(ctx, "1935/tcp")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
logger := testhelpers.NewTestLogger(t).With("component", "integration")
|
logger := testhelpers.NewTestLogger(t).With("component", "integration")
|
||||||
@ -74,16 +114,10 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int,
|
|||||||
|
|
||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
destURL1 := fmt.Sprintf("rtmp://%s:%d/%s/dest1", hostIP, destServerPort.Int(), wantStreamKey)
|
destURL1 := fmt.Sprintf("rtmp://%s:%d/live/dest1", hostIP, destServerPort.Int())
|
||||||
destURL2 := fmt.Sprintf("rtmp://%s:%d/%s/dest2", hostIP, destServerPort.Int(), wantStreamKey)
|
destURL2 := fmt.Sprintf("rtmp://%s:%d/live/dest2", hostIP, destServerPort.Int())
|
||||||
configService := setupConfigService(t, config.Config{
|
configService := setupConfigService(t, config.Config{
|
||||||
Sources: config.Sources{
|
Sources: config.Sources{MediaServer: mediaServerConfig},
|
||||||
RTMP: config.RTMPSource{
|
|
||||||
Enabled: true,
|
|
||||||
Host: rtmpHost,
|
|
||||||
BindAddr: config.NetAddr{IP: rtmpIP, Port: rtmpPort},
|
|
||||||
StreamKey: streamKey,
|
|
||||||
}},
|
|
||||||
// Load one destination from config, add the other in-app.
|
// Load one destination from config, add the other in-app.
|
||||||
Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}},
|
Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}},
|
||||||
})
|
})
|
||||||
@ -112,13 +146,12 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int,
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected the mediaserver to start",
|
"expected the mediaserver to start",
|
||||||
)
|
)
|
||||||
@ -129,16 +162,14 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int,
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[1], "URL "+wantRTMPURL, "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||||
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy")
|
||||||
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to receive an ingress stream",
|
"expected to receive an ingress stream",
|
||||||
)
|
)
|
||||||
@ -158,22 +189,21 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int,
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||||
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy")
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "off-air", "expected local server 1 to be off-air")
|
assert.Contains(c, contents[3], "off-air", "expected local server 0 to be off-air")
|
||||||
|
|
||||||
require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present")
|
require.Contains(c, contents[3], "Local server 2", "expected local server 2 to be present")
|
||||||
assert.Contains(t, contents[3], "off-air", "expected local server 2 to be off-air")
|
assert.Contains(c, contents[3], "off-air", "expected local server 2 to be off-air")
|
||||||
|
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to add the destinations",
|
"expected to add the destinations",
|
||||||
)
|
)
|
||||||
@ -186,23 +216,22 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int,
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||||
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy")
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
|
assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending")
|
||||||
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
|
assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||||
|
|
||||||
require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present")
|
require.Contains(c, contents[3], "Local server 2", "expected local server 2 to be present")
|
||||||
assert.Contains(t, contents[3], "sending", "expected local server 2 to be sending")
|
assert.Contains(c, contents[3], "sending", "expected local server 2 to be sending")
|
||||||
assert.Contains(t, contents[3], "healthy", "expected local server 2 to be healthy")
|
assert.Contains(c, contents[3], "healthy", "expected local server 2 to be healthy")
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to start the destination streams",
|
"expected to start the destination streams",
|
||||||
)
|
)
|
||||||
@ -214,22 +243,21 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int,
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||||
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy")
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
|
assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending")
|
||||||
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
|
assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||||
|
|
||||||
require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present")
|
require.NotContains(c, contents[3], "Local server 2", "expected local server 2 to not be present")
|
||||||
|
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to remove the second destination",
|
"expected to remove the second destination",
|
||||||
)
|
)
|
||||||
@ -240,16 +268,15 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int,
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "exited", "expected local server 1 to have exited")
|
assert.Contains(c, contents[2], "exited", "expected local server 1 to have exited")
|
||||||
|
|
||||||
require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present")
|
require.NotContains(c, contents[3], "Local server 2", "expected local server 2 to not be present")
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to stop the first destination stream",
|
"expected to stop the first destination stream",
|
||||||
)
|
)
|
||||||
@ -275,9 +302,9 @@ func TestIntegrationCustomRTMPURL(t *testing.T) {
|
|||||||
|
|
||||||
configService := setupConfigService(t, config.Config{
|
configService := setupConfigService(t, config.Config{
|
||||||
Sources: config.Sources{
|
Sources: config.Sources{
|
||||||
RTMP: config.RTMPSource{
|
MediaServer: config.MediaServerSource{
|
||||||
Enabled: true,
|
|
||||||
Host: "rtmp.live.tv",
|
Host: "rtmp.live.tv",
|
||||||
|
RTMP: config.RTMPSource{Enabled: true},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@ -292,12 +319,15 @@ func TestIntegrationCustomRTMPURL(t *testing.T) {
|
|||||||
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
sendKey(t, screen, tcell.KeyF1, ' ')
|
||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(t *assert.CollectT) {
|
||||||
assert.True(t, contentsIncludes(getContents(), "URL rtmp://rtmp.live.tv:1935/live"), "expected to see custom host name")
|
assert.True(t, contentsIncludes(getContents(), "rtmp://rtmp.live.tv:1935/live"), "expected to see custom host name")
|
||||||
},
|
},
|
||||||
5*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to see custom host name",
|
"expected to see custom host name",
|
||||||
)
|
)
|
||||||
@ -307,6 +337,7 @@ func TestIntegrationCustomRTMPURL(t *testing.T) {
|
|||||||
|
|
||||||
<-done
|
<-done
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIntegrationRestartDestination(t *testing.T) {
|
func TestIntegrationRestartDestination(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -336,7 +367,7 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
|||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
configService := setupConfigService(t, config.Config{
|
configService := setupConfigService(t, config.Config{
|
||||||
Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}},
|
Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}},
|
||||||
Destinations: []config.Destination{{
|
Destinations: []config.Destination{{
|
||||||
Name: "Local server 1",
|
Name: "Local server 1",
|
||||||
URL: fmt.Sprintf("rtmp://%s:%d/live", hostIP, destServerRTMPPort.Int()),
|
URL: fmt.Sprintf("rtmp://%s:%d/live", hostIP, destServerRTMPPort.Int()),
|
||||||
@ -354,13 +385,12 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected the mediaserver to start",
|
"expected the mediaserver to start",
|
||||||
)
|
)
|
||||||
@ -371,13 +401,12 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 3, "expected at least 3 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to receive an ingress stream",
|
"expected to receive an ingress stream",
|
||||||
)
|
)
|
||||||
@ -388,17 +417,16 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
|
assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending")
|
||||||
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
|
assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to start the destination stream",
|
"expected to start the destination stream",
|
||||||
)
|
)
|
||||||
@ -411,17 +439,16 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 3, "expected at least 3 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "off-air", "expected local server 1 to be off-air")
|
assert.Contains(c, contents[2], "off-air", "expected local server 1 to be off-air")
|
||||||
assert.Contains(t, contents[2], "restarting", "expected local server 1 to be restarting")
|
assert.Contains(c, contents[2], "restarting", "expected local server 1 to be restarting")
|
||||||
},
|
},
|
||||||
20*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to begin restarting",
|
"expected to begin restarting",
|
||||||
)
|
)
|
||||||
@ -429,17 +456,16 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 4 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
|
assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending")
|
||||||
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
|
assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to restart the destination stream",
|
"expected to restart the destination stream",
|
||||||
)
|
)
|
||||||
@ -450,16 +476,15 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 4 lines of output")
|
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "exited", "expected local server 1 to have exited")
|
assert.Contains(c, contents[2], "exited", "expected local server 1 to have exited")
|
||||||
|
|
||||||
require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present")
|
require.NotContains(c, contents[3], "Local server 2", "expected local server 2 to not be present")
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to stop the destination stream",
|
"expected to stop the destination stream",
|
||||||
)
|
)
|
||||||
@ -482,7 +507,7 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
|||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
configService := setupConfigService(t, config.Config{
|
configService := setupConfigService(t, config.Config{
|
||||||
Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}},
|
Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}},
|
||||||
Destinations: []config.Destination{{Name: "Example server", URL: "rtmp://rtmp.example.com/live"}},
|
Destinations: []config.Destination{{Name: "Example server", URL: "rtmp://rtmp.example.com/live"}},
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -497,13 +522,12 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected the mediaserver to start",
|
"expected the mediaserver to start",
|
||||||
)
|
)
|
||||||
@ -514,13 +538,12 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 3, "expected at least 3 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to receive an ingress stream",
|
"expected to receive an ingress stream",
|
||||||
)
|
)
|
||||||
@ -531,12 +554,12 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
assert.True(t, contentsIncludes(contents, "Streaming to Example server failed:"), "expected to see destination error")
|
assert.True(c, contentsIncludes(contents, "Streaming to Example server failed:"), "expected to see destination error")
|
||||||
assert.True(t, contentsIncludes(contents, "Error opening output files: I/O error"), "expected to see destination error")
|
assert.True(c, contentsIncludes(contents, "Error opening output files: I/O error"), "expected to see destination error")
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to see the destination start error modal",
|
"expected to see the destination start error modal",
|
||||||
)
|
)
|
||||||
@ -558,7 +581,7 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
|||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
configService := setupConfigService(t, config.Config{
|
configService := setupConfigService(t, config.Config{
|
||||||
Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true, StreamKey: "live"}},
|
Sources: config.Sources{MediaServer: config.MediaServerSource{StreamKey: "live", RTMP: config.RTMPSource{Enabled: true}}},
|
||||||
})
|
})
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
@ -572,14 +595,14 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
require.True(c, len(contents) > 2, "expected at least 3 lines of output")
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||||
assert.True(t, contentsIncludes(contents, "No destinations added yet. Press [a] to add a new destination."), "expected to see no destinations message")
|
assert.True(c, contentsIncludes(contents, "No destinations added yet. Press [a] to add a new destination."), "expected to see no destinations message")
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected the mediaserver to start",
|
"expected the mediaserver to start",
|
||||||
)
|
)
|
||||||
@ -593,13 +616,13 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
|
|
||||||
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
assert.True(c, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
||||||
assert.True(t, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error")
|
assert.True(c, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error")
|
||||||
},
|
},
|
||||||
10*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected a validation error for an empty URL",
|
"expected a validation error for an empty URL",
|
||||||
)
|
)
|
||||||
@ -613,13 +636,13 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
|
|
||||||
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
assert.True(c, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
||||||
assert.True(t, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error")
|
assert.True(c, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error")
|
||||||
},
|
},
|
||||||
10*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected a validation error for an invalid URL",
|
"expected a validation error for an invalid URL",
|
||||||
)
|
)
|
||||||
@ -634,15 +657,14 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
||||||
|
|
||||||
require.Contains(t, contents[2], "My stream", "expected new destination to be present")
|
require.Contains(c, contents[2], "My stream", "expected new destination to be present")
|
||||||
assert.Contains(t, contents[2], "off-air", "expected new destination to be off-air")
|
assert.Contains(c, contents[2], "off-air", "expected new destination to be off-air")
|
||||||
assert.False(t, contentsIncludes(contents, "No destinations added yet. Press [a] to add a new destination."), "expected to not see no destinations message")
|
assert.False(c, contentsIncludes(contents, "No destinations added yet"), "expected to not see no destinations message")
|
||||||
},
|
},
|
||||||
10*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to add the destination",
|
"expected to add the destination",
|
||||||
)
|
)
|
||||||
@ -660,13 +682,13 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
|
|
||||||
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
assert.True(c, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
||||||
assert.True(t, contentsIncludes(contents, "validate: duplicate destination URL: rtmp://"), "expected to see config update error")
|
assert.True(c, contentsIncludes(contents, "validate: duplicate destination URL: rtmp://"), "expected to see config update error")
|
||||||
},
|
},
|
||||||
10*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected a validation error for a duplicate URL",
|
"expected a validation error for a duplicate URL",
|
||||||
)
|
)
|
||||||
@ -701,7 +723,7 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
|||||||
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}}})
|
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
@ -715,10 +737,10 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
assert.True(t, contentsIncludes(getContents(), "Another instance of Octoplex may already be running."), "expected to see startup check modal")
|
assert.True(c, contentsIncludes(getContents(), "Another instance of Octoplex may already be running."), "expected to see startup check modal")
|
||||||
},
|
},
|
||||||
30*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to see startup check modal",
|
"expected to see startup check modal",
|
||||||
)
|
)
|
||||||
@ -728,13 +750,13 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
_, err := staleContainer.State(context.Background())
|
_, err := staleContainer.State(context.Background())
|
||||||
// IsRunning() does not work, probably because we're undercutting the
|
// IsRunning() does not work, probably because we're undercutting the
|
||||||
// testcontainers API.
|
// testcontainers API.
|
||||||
require.True(t, errdefs.IsNotFound(err), "expected to not find the container")
|
require.True(c, errdefs.IsNotFound(err), "expected to not find the container")
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
2*time.Second,
|
2*time.Second,
|
||||||
"expected to quit the other containers",
|
"expected to quit the other containers",
|
||||||
)
|
)
|
||||||
@ -742,13 +764,13 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
require.True(c, len(contents) > 2, "expected at least 3 lines of output")
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||||
},
|
},
|
||||||
10*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected the mediaserver to start",
|
"expected the mediaserver to start",
|
||||||
)
|
)
|
||||||
@ -770,7 +792,7 @@ func TestIntegrationMediaServerError(t *testing.T) {
|
|||||||
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}}})
|
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
@ -784,11 +806,11 @@ func TestIntegrationMediaServerError(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
assert.True(t, contentsIncludes(getContents(), "Mediaserver error: Server process exited unexpectedly."), "expected to see title")
|
assert.True(c, contentsIncludes(getContents(), "Mediaserver error: Server process exited unexpectedly."), "expected to see title")
|
||||||
assert.True(t, contentsIncludes(getContents(), "address already in use"), "expected to see message")
|
assert.True(c, contentsIncludes(getContents(), "address already in use"), "expected to see message")
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to see media server error modal",
|
"expected to see media server error modal",
|
||||||
)
|
)
|
||||||
@ -809,7 +831,7 @@ func TestIntegrationDockerClientError(t *testing.T) {
|
|||||||
var dockerClient mocks.DockerClient
|
var dockerClient mocks.DockerClient
|
||||||
dockerClient.EXPECT().NetworkCreate(mock.Anything, mock.Anything, mock.Anything).Return(network.CreateResponse{}, errors.New("boom"))
|
dockerClient.EXPECT().NetworkCreate(mock.Anything, mock.Anything, mock.Anything).Return(network.CreateResponse{}, errors.New("boom"))
|
||||||
|
|
||||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}}})
|
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
@ -827,11 +849,11 @@ func TestIntegrationDockerClientError(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
assert.True(t, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message")
|
assert.True(c, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message")
|
||||||
assert.True(t, contentsIncludes(getContents(), "create container client: network create: boom"), "expected to see message")
|
assert.True(c, contentsIncludes(getContents(), "create container client: network create: boom"), "expected to see message")
|
||||||
},
|
},
|
||||||
5*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to see fatal error modal",
|
"expected to see fatal error modal",
|
||||||
)
|
)
|
||||||
@ -842,6 +864,7 @@ func TestIntegrationDockerClientError(t *testing.T) {
|
|||||||
|
|
||||||
<-done
|
<-done
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIntegrationDockerConnectionError(t *testing.T) {
|
func TestIntegrationDockerConnectionError(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -850,7 +873,7 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
|
|||||||
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.WithHost("http://docker.example.com"))
|
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.WithHost("http://docker.example.com"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}}})
|
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
@ -866,11 +889,11 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
assert.True(t, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message")
|
assert.True(c, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message")
|
||||||
assert.True(t, contentsIncludes(getContents(), "Could not connect to Docker. Is Docker installed"), "expected to see message")
|
assert.True(c, contentsIncludes(getContents(), "Could not connect to Docker. Is Docker installed"), "expected to see message")
|
||||||
},
|
},
|
||||||
5*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to see fatal error modal",
|
"expected to see fatal error modal",
|
||||||
)
|
)
|
||||||
@ -881,3 +904,103 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
|
|||||||
|
|
||||||
<-done
|
<-done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIntegrationCopyURLs(t *testing.T) {
|
||||||
|
type binding struct {
|
||||||
|
key tcell.Key
|
||||||
|
content string
|
||||||
|
url string
|
||||||
|
}
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
mediaServerConfig config.MediaServerSource
|
||||||
|
wantBindings []binding
|
||||||
|
wantNot []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "RTMP only",
|
||||||
|
mediaServerConfig: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}},
|
||||||
|
wantBindings: []binding{
|
||||||
|
{
|
||||||
|
key: tcell.KeyF1,
|
||||||
|
content: "F1 Copy source RTMP URL",
|
||||||
|
url: "rtmp://localhost:1935/live",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantNot: []string{"F2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "RTMPS only",
|
||||||
|
mediaServerConfig: config.MediaServerSource{RTMPS: config.RTMPSource{Enabled: true}},
|
||||||
|
wantBindings: []binding{
|
||||||
|
{
|
||||||
|
key: tcell.KeyF1,
|
||||||
|
content: "F1 Copy source RTMPS URL",
|
||||||
|
url: "rtmps://localhost:1936/live",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantNot: []string{"F2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "RTMP and RTMPS",
|
||||||
|
mediaServerConfig: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}, RTMPS: config.RTMPSource{Enabled: true}},
|
||||||
|
wantBindings: []binding{
|
||||||
|
{
|
||||||
|
key: tcell.KeyF1,
|
||||||
|
content: "F1 Copy source RTMP URL",
|
||||||
|
url: "rtmp://localhost:1935/live",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
key: tcell.KeyF2,
|
||||||
|
content: "F2 Copy source RTMPS URL",
|
||||||
|
url: "rtmps://localhost:1936/live",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
logger := testhelpers.NewTestLogger(t).With("component", "integration")
|
||||||
|
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: tc.mediaServerConfig}})
|
||||||
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
done <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
printScreen(t, getContents, "Ater loading the app")
|
||||||
|
|
||||||
|
for _, want := range tc.wantBindings {
|
||||||
|
assert.True(t, contentsIncludes(getContents(), want.content), "expected to see %q", want)
|
||||||
|
|
||||||
|
sendKey(t, screen, want.key, ' ')
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
assert.True(t, contentsIncludes(getContents(), want.url), "expected to see copied message")
|
||||||
|
|
||||||
|
sendKey(t, screen, tcell.KeyEscape, ' ')
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, wantNot := range tc.wantNot {
|
||||||
|
assert.False(t, contentsIncludes(getContents(), wantNot), "expected to not see %q", wantNot)
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
<-done
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -31,14 +31,21 @@ type NetAddr struct {
|
|||||||
// RTMPSource holds the configuration for the RTMP source.
|
// RTMPSource holds the configuration for the RTMP source.
|
||||||
type RTMPSource struct {
|
type RTMPSource struct {
|
||||||
Enabled bool `yaml:"enabled"`
|
Enabled bool `yaml:"enabled"`
|
||||||
|
|
||||||
|
NetAddr `yaml:",inline"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// MediaServerSource holds the configuration for the media server source.
|
||||||
|
type MediaServerSource struct {
|
||||||
StreamKey string `yaml:"streamKey,omitempty"`
|
StreamKey string `yaml:"streamKey,omitempty"`
|
||||||
Host string `yaml:"host,omitempty"`
|
Host string `yaml:"host,omitempty"`
|
||||||
BindAddr NetAddr `yaml:"bindAddr,omitempty"`
|
RTMP RTMPSource `yaml:"rtmp,omitempty"`
|
||||||
|
RTMPS RTMPSource `yaml:"rtmps,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sources holds the configuration for the sources.
|
// Sources holds the configuration for the sources.
|
||||||
type Sources struct {
|
type Sources struct {
|
||||||
RTMP RTMPSource `yaml:"rtmp"`
|
MediaServer MediaServerSource `yaml:"mediaServer"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config holds the configuration for the application.
|
// Config holds the configuration for the application.
|
||||||
|
@ -180,10 +180,13 @@ func (s *Service) writeConfig(cfgBytes []byte) error {
|
|||||||
// populateConfigOnBuild is called to set default values for a new, empty
|
// populateConfigOnBuild is called to set default values for a new, empty
|
||||||
// configuration.
|
// configuration.
|
||||||
//
|
//
|
||||||
// This function may set exported fields to arbitrary values.
|
// This function may set serialized fields to arbitrary values.
|
||||||
func (s *Service) populateConfigOnBuild(cfg *Config) {
|
func (s *Service) populateConfigOnBuild(cfg *Config) {
|
||||||
cfg.Sources.RTMP.Enabled = true
|
cfg.Sources.MediaServer.StreamKey = "live"
|
||||||
cfg.Sources.RTMP.StreamKey = "live"
|
cfg.Sources.MediaServer.RTMP = RTMPSource{
|
||||||
|
Enabled: true,
|
||||||
|
NetAddr: NetAddr{IP: "127.0.0.1", Port: 1935},
|
||||||
|
}
|
||||||
|
|
||||||
s.populateConfigOnRead(cfg)
|
s.populateConfigOnRead(cfg)
|
||||||
}
|
}
|
||||||
@ -191,7 +194,7 @@ func (s *Service) populateConfigOnBuild(cfg *Config) {
|
|||||||
// populateConfigOnRead is called to set default values for a configuration
|
// populateConfigOnRead is called to set default values for a configuration
|
||||||
// read from an existing file.
|
// read from an existing file.
|
||||||
//
|
//
|
||||||
// This function should not update any exported values, which would be a
|
// This function should not update any serialized values, which would be a
|
||||||
// confusing experience for the user.
|
// confusing experience for the user.
|
||||||
func (s *Service) populateConfigOnRead(cfg *Config) {
|
func (s *Service) populateConfigOnRead(cfg *Config) {
|
||||||
cfg.LogFile.defaultPath = filepath.Join(s.appStateDir, "octoplex.log")
|
cfg.LogFile.defaultPath = filepath.Join(s.appStateDir, "octoplex.log")
|
||||||
|
@ -19,6 +19,9 @@ import (
|
|||||||
//go:embed testdata/complete.yml
|
//go:embed testdata/complete.yml
|
||||||
var configComplete []byte
|
var configComplete []byte
|
||||||
|
|
||||||
|
//go:embed testdata/rtmps-only.yml
|
||||||
|
var configRTMPSOnly []byte
|
||||||
|
|
||||||
//go:embed testdata/logfile.yml
|
//go:embed testdata/logfile.yml
|
||||||
var configLogfile []byte
|
var configLogfile []byte
|
||||||
|
|
||||||
@ -44,7 +47,9 @@ func TestConfigServiceCurrent(t *testing.T) {
|
|||||||
t.Cleanup(func() { require.NoError(t, os.RemoveAll(systemConfigDir)) })
|
t.Cleanup(func() { require.NoError(t, os.RemoveAll(systemConfigDir)) })
|
||||||
|
|
||||||
// Ensure defaults are set:
|
// Ensure defaults are set:
|
||||||
assert.True(t, service.Current().Sources.RTMP.Enabled)
|
assert.NotNil(t, service.Current().Sources.MediaServer.RTMP)
|
||||||
|
assert.Equal(t, "127.0.0.1", service.Current().Sources.MediaServer.RTMP.IP)
|
||||||
|
assert.Equal(t, 1935, service.Current().Sources.MediaServer.RTMP.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConfigServiceCreateConfig(t *testing.T) {
|
func TestConfigServiceCreateConfig(t *testing.T) {
|
||||||
@ -67,7 +72,9 @@ func TestConfigServiceCreateConfig(t *testing.T) {
|
|||||||
|
|
||||||
var readCfg config.Config
|
var readCfg config.Config
|
||||||
require.NoError(t, yaml.Unmarshal(cfgBytes, &readCfg))
|
require.NoError(t, yaml.Unmarshal(cfgBytes, &readCfg))
|
||||||
assert.True(t, readCfg.Sources.RTMP.Enabled, "default values not set")
|
assert.NotNil(t, readCfg.Sources.MediaServer.RTMP)
|
||||||
|
assert.Equal(t, "127.0.0.1", readCfg.Sources.MediaServer.RTMP.IP)
|
||||||
|
assert.Equal(t, 1935, readCfg.Sources.MediaServer.RTMP.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConfigServiceReadConfig(t *testing.T) {
|
func TestConfigServiceReadConfig(t *testing.T) {
|
||||||
@ -90,15 +97,24 @@ func TestConfigServiceReadConfig(t *testing.T) {
|
|||||||
Path: "test.log",
|
Path: "test.log",
|
||||||
},
|
},
|
||||||
Sources: config.Sources{
|
Sources: config.Sources{
|
||||||
RTMP: config.RTMPSource{
|
MediaServer: config.MediaServerSource{
|
||||||
Enabled: true,
|
|
||||||
StreamKey: "s3cr3t",
|
StreamKey: "s3cr3t",
|
||||||
Host: "rtmp.example.com",
|
Host: "rtmp.example.com",
|
||||||
BindAddr: config.NetAddr{
|
RTMP: config.RTMPSource{
|
||||||
|
Enabled: true,
|
||||||
|
NetAddr: config.NetAddr{
|
||||||
IP: "0.0.0.0",
|
IP: "0.0.0.0",
|
||||||
Port: 19350,
|
Port: 19350,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
RTMPS: config.RTMPSource{
|
||||||
|
Enabled: true,
|
||||||
|
NetAddr: config.NetAddr{
|
||||||
|
IP: "0.0.0.0",
|
||||||
|
Port: 19443,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Destinations: []config.Destination{
|
Destinations: []config.Destination{
|
||||||
{
|
{
|
||||||
@ -113,6 +129,33 @@ func TestConfigServiceReadConfig(t *testing.T) {
|
|||||||
)
|
)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "RTMPS only",
|
||||||
|
configBytes: configRTMPSOnly,
|
||||||
|
want: func(t *testing.T, cfg config.Config) {
|
||||||
|
require.Empty(
|
||||||
|
t,
|
||||||
|
gocmp.Diff(
|
||||||
|
config.Config{
|
||||||
|
LogFile: config.LogFile{Enabled: true},
|
||||||
|
Sources: config.Sources{
|
||||||
|
MediaServer: config.MediaServerSource{
|
||||||
|
RTMPS: config.RTMPSource{
|
||||||
|
Enabled: true,
|
||||||
|
NetAddr: config.NetAddr{
|
||||||
|
IP: "0.0.0.0",
|
||||||
|
Port: 1935,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
cfg,
|
||||||
|
cmpopts.IgnoreUnexported(config.LogFile{}),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "logging enabled, logfile",
|
name: "logging enabled, logfile",
|
||||||
configBytes: configLogfile,
|
configBytes: configLogfile,
|
||||||
|
10
internal/config/testdata/complete.yml
vendored
10
internal/config/testdata/complete.yml
vendored
@ -3,13 +3,17 @@ logfile:
|
|||||||
enabled: true
|
enabled: true
|
||||||
path: test.log
|
path: test.log
|
||||||
sources:
|
sources:
|
||||||
rtmp:
|
mediaServer:
|
||||||
enabled: true
|
|
||||||
streamKey: s3cr3t
|
streamKey: s3cr3t
|
||||||
host: rtmp.example.com
|
host: rtmp.example.com
|
||||||
bindAddr:
|
rtmp:
|
||||||
|
enabled: true
|
||||||
ip: 0.0.0.0
|
ip: 0.0.0.0
|
||||||
port: 19350
|
port: 19350
|
||||||
|
rtmps:
|
||||||
|
enabled: true
|
||||||
|
ip: 0.0.0.0
|
||||||
|
port: 19443
|
||||||
destinations:
|
destinations:
|
||||||
- name: my stream
|
- name: my stream
|
||||||
url: rtmp://rtmp.example.com:1935/live
|
url: rtmp://rtmp.example.com:1935/live
|
||||||
|
9
internal/config/testdata/rtmps-only.yml
vendored
Normal file
9
internal/config/testdata/rtmps-only.yml
vendored
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
---
|
||||||
|
logfile:
|
||||||
|
enabled: true
|
||||||
|
sources:
|
||||||
|
mediaServer:
|
||||||
|
rtmps:
|
||||||
|
enabled: true
|
||||||
|
ip: 0.0.0.0
|
||||||
|
port: 1935
|
@ -35,7 +35,6 @@ type Source struct {
|
|||||||
Live bool
|
Live bool
|
||||||
LiveChangedAt time.Time
|
LiveChangedAt time.Time
|
||||||
Tracks []string
|
Tracks []string
|
||||||
RTMPURL string
|
|
||||||
ExitReason string
|
ExitReason string
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,6 +61,11 @@ type NetAddr struct {
|
|||||||
Port int
|
Port int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsZero returns true if the NetAddr is zero value.
|
||||||
|
func (n NetAddr) IsZero() bool {
|
||||||
|
return n.IP == "" && n.Port == 0
|
||||||
|
}
|
||||||
|
|
||||||
// Container status strings.
|
// Container status strings.
|
||||||
//
|
//
|
||||||
// TODO: refactor to strictly reflect Docker status strings.
|
// TODO: refactor to strictly reflect Docker status strings.
|
||||||
|
@ -31,3 +31,12 @@ func TestAppStateClone(t *testing.T) {
|
|||||||
s.Destinations[0].Name = "Twitch"
|
s.Destinations[0].Name = "Twitch"
|
||||||
assert.Equal(t, "YouTube", s2.Destinations[0].Name)
|
assert.Equal(t, "YouTube", s2.Destinations[0].Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNetAddr(t *testing.T) {
|
||||||
|
var addr domain.NetAddr
|
||||||
|
assert.True(t, addr.IsZero())
|
||||||
|
|
||||||
|
addr.IP = "127.0.0.1"
|
||||||
|
addr.Port = 3000
|
||||||
|
assert.False(t, addr.IsZero())
|
||||||
|
}
|
||||||
|
@ -30,7 +30,8 @@ const (
|
|||||||
defaultAPIPort = 9997 // default API host port for the media server
|
defaultAPIPort = 9997 // default API host port for the media server
|
||||||
defaultRTMPIP = "127.0.0.1" // default RTMP host IP, bound to localhost for security
|
defaultRTMPIP = "127.0.0.1" // default RTMP host IP, bound to localhost for security
|
||||||
defaultRTMPPort = 1935 // default RTMP host port for the media server
|
defaultRTMPPort = 1935 // default RTMP host port for the media server
|
||||||
defaultRTMPHost = "localhost" // default RTMP host name, used for the RTMP URL
|
defaultRTMPSPort = 1936 // default RTMPS host port for the media server
|
||||||
|
defaultHost = "localhost" // default mediaserver host name
|
||||||
defaultChanSize = 64 // default channel size for asynchronous non-error channels
|
defaultChanSize = 64 // default channel size for asynchronous non-error channels
|
||||||
imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx
|
imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx
|
||||||
defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey].
|
defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey].
|
||||||
@ -47,9 +48,10 @@ type Actor struct {
|
|||||||
stateC chan domain.Source
|
stateC chan domain.Source
|
||||||
chanSize int
|
chanSize int
|
||||||
containerClient *container.Client
|
containerClient *container.Client
|
||||||
apiPort int
|
|
||||||
rtmpAddr domain.NetAddr
|
rtmpAddr domain.NetAddr
|
||||||
rtmpHost string
|
rtmpsAddr domain.NetAddr
|
||||||
|
apiPort int
|
||||||
|
host string
|
||||||
streamKey StreamKey
|
streamKey StreamKey
|
||||||
updateStateInterval time.Duration
|
updateStateInterval time.Duration
|
||||||
pass string // password for the media server
|
pass string // password for the media server
|
||||||
@ -64,9 +66,10 @@ type Actor struct {
|
|||||||
// NewActorParams contains the parameters for building a new media server
|
// NewActorParams contains the parameters for building a new media server
|
||||||
// actor.
|
// actor.
|
||||||
type NewActorParams struct {
|
type NewActorParams struct {
|
||||||
|
RTMPAddr OptionalNetAddr // defaults to disabled, or 127.0.0.1:1935
|
||||||
|
RTMPSAddr OptionalNetAddr // defaults to disabled, or 127.0.0.1:1936
|
||||||
APIPort int // defaults to 9997
|
APIPort int // defaults to 9997
|
||||||
RTMPAddr domain.NetAddr // defaults to 127.0.0.1:1935
|
Host string // defaults to "localhost"
|
||||||
RTMPHost string // defaults to "localhost"
|
|
||||||
StreamKey StreamKey // defaults to "live"
|
StreamKey StreamKey // defaults to "live"
|
||||||
ChanSize int // defaults to 64
|
ChanSize int // defaults to 64
|
||||||
UpdateStateInterval time.Duration // defaults to 5 seconds
|
UpdateStateInterval time.Duration // defaults to 5 seconds
|
||||||
@ -74,6 +77,14 @@ type NewActorParams struct {
|
|||||||
Logger *slog.Logger
|
Logger *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OptionalNetAddr is a wrapper around domain.NetAddr that indicates whether it
|
||||||
|
// is enabled or not.
|
||||||
|
type OptionalNetAddr struct {
|
||||||
|
domain.NetAddr
|
||||||
|
|
||||||
|
Enabled bool
|
||||||
|
}
|
||||||
|
|
||||||
// NewActor creates a new media server actor.
|
// NewActor creates a new media server actor.
|
||||||
//
|
//
|
||||||
// Callers must consume the state channel exposed via [C].
|
// Callers must consume the state channel exposed via [C].
|
||||||
@ -87,15 +98,12 @@ func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error)
|
|||||||
return nil, fmt.Errorf("build API client: %w", err)
|
return nil, fmt.Errorf("build API client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rtmpAddr := params.RTMPAddr
|
|
||||||
rtmpAddr.IP = cmp.Or(rtmpAddr.IP, defaultRTMPIP)
|
|
||||||
rtmpAddr.Port = cmp.Or(rtmpAddr.Port, defaultRTMPPort)
|
|
||||||
|
|
||||||
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
|
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
|
||||||
return &Actor{
|
return &Actor{
|
||||||
|
rtmpAddr: toRTMPAddr(params.RTMPAddr, defaultRTMPPort),
|
||||||
|
rtmpsAddr: toRTMPAddr(params.RTMPSAddr, defaultRTMPSPort),
|
||||||
apiPort: cmp.Or(params.APIPort, defaultAPIPort),
|
apiPort: cmp.Or(params.APIPort, defaultAPIPort),
|
||||||
rtmpAddr: rtmpAddr,
|
host: cmp.Or(params.Host, defaultHost),
|
||||||
rtmpHost: cmp.Or(params.RTMPHost, defaultRTMPHost),
|
|
||||||
streamKey: cmp.Or(params.StreamKey, defaultStreamKey),
|
streamKey: cmp.Or(params.StreamKey, defaultStreamKey),
|
||||||
updateStateInterval: cmp.Or(params.UpdateStateInterval, defaultUpdateStateInterval),
|
updateStateInterval: cmp.Or(params.UpdateStateInterval, defaultUpdateStateInterval),
|
||||||
tlsCert: tlsCert,
|
tlsCert: tlsCert,
|
||||||
@ -112,56 +120,37 @@ func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *Actor) Start(ctx context.Context) error {
|
func (a *Actor) Start(ctx context.Context) error {
|
||||||
apiPortSpec := nat.Port(fmt.Sprintf("127.0.0.1:%d:9997", a.apiPort))
|
var portSpecs []string
|
||||||
rtmpPortSpec := nat.Port(fmt.Sprintf("%s:%d:%d", a.rtmpAddr.IP, a.rtmpAddr.Port, 1935))
|
portSpecs = append(portSpecs, fmt.Sprintf("127.0.0.1:%d:9997", a.apiPort))
|
||||||
exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)})
|
if !a.rtmpAddr.IsZero() {
|
||||||
|
portSpecs = append(portSpecs, fmt.Sprintf("%s:%d:%d", a.rtmpAddr.IP, a.rtmpAddr.Port, 1935))
|
||||||
// The RTMP URL is passed to the UI via the state.
|
}
|
||||||
// This could be refactored, it's not really stateful data.
|
if !a.rtmpsAddr.IsZero() {
|
||||||
a.state.RTMPURL = a.RTMPURL()
|
portSpecs = append(portSpecs, fmt.Sprintf("%s:%d:%d", a.rtmpsAddr.IP, a.rtmpsAddr.Port, 1936))
|
||||||
|
}
|
||||||
cfg, err := yaml.Marshal(
|
exposedPorts, portBindings, err := nat.ParsePortSpecs(portSpecs)
|
||||||
Config{
|
if err != nil {
|
||||||
LogLevel: "info",
|
return fmt.Errorf("parse port specs: %w", err)
|
||||||
LogDestinations: []string{"stdout"},
|
|
||||||
AuthMethod: "internal",
|
|
||||||
AuthInternalUsers: []User{
|
|
||||||
{
|
|
||||||
User: "any",
|
|
||||||
IPs: []string{}, // any IP
|
|
||||||
Permissions: []UserPermission{
|
|
||||||
{Action: "publish"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
User: "api",
|
|
||||||
Pass: a.pass,
|
|
||||||
IPs: []string{}, // any IP
|
|
||||||
Permissions: []UserPermission{
|
|
||||||
{Action: "read"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
User: "api",
|
|
||||||
Pass: a.pass,
|
|
||||||
IPs: []string{}, // any IP
|
|
||||||
Permissions: []UserPermission{{Action: "api"}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
API: true,
|
|
||||||
APIEncryption: true,
|
|
||||||
APIServerCert: "/etc/tls.crt",
|
|
||||||
APIServerKey: "/etc/tls.key",
|
|
||||||
Paths: map[string]Path{
|
|
||||||
string(a.streamKey): {Source: "publisher"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
if err != nil { // should never happen
|
|
||||||
return fmt.Errorf("marshal config: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
a.logger.Info("Starting media server", "host", a.rtmpHost, "bind_ip", a.rtmpAddr.IP, "bind_port", a.rtmpAddr.Port)
|
cfg, err := a.buildServerConfig()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("build server config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
args := []any{"host", a.host}
|
||||||
|
if a.rtmpAddr.IsZero() {
|
||||||
|
args = append(args, "rtmp.enabled", false)
|
||||||
|
} else {
|
||||||
|
args = append(args, "rtmp.enabled", true, "rtmp.bind_addr", a.rtmpAddr.IP, "rtmp.bind_port", a.rtmpAddr.Port)
|
||||||
|
}
|
||||||
|
if a.rtmpsAddr.IsZero() {
|
||||||
|
args = append(args, "rtmps.enabled", false)
|
||||||
|
} else {
|
||||||
|
args = append(args, "rtmps.enabled", true, "rtmps.bind_addr", a.rtmpsAddr.IP, "rtmps.bind_port", a.rtmpsAddr.Port)
|
||||||
|
}
|
||||||
|
a.logger.Info("Starting media server", args...)
|
||||||
|
|
||||||
containerStateC, errC := a.containerClient.RunContainer(
|
containerStateC, errC := a.containerClient.RunContainer(
|
||||||
ctx,
|
ctx,
|
||||||
container.RunContainerParams{
|
container.RunContainerParams{
|
||||||
@ -224,6 +213,62 @@ func (a *Actor) Start(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Actor) buildServerConfig() ([]byte, error) {
|
||||||
|
// NOTE: Regardless of the user configuration (which mostly affects exposed
|
||||||
|
// ports and UI rendering) plain RTMP must be enabled at the container level,
|
||||||
|
// for internal connections.
|
||||||
|
var encryptionString string
|
||||||
|
if a.rtmpsAddr.IsZero() {
|
||||||
|
encryptionString = "no"
|
||||||
|
} else {
|
||||||
|
encryptionString = "optional"
|
||||||
|
}
|
||||||
|
|
||||||
|
return yaml.Marshal(
|
||||||
|
Config{
|
||||||
|
LogLevel: "debug",
|
||||||
|
LogDestinations: []string{"stdout"},
|
||||||
|
AuthMethod: "internal",
|
||||||
|
AuthInternalUsers: []User{
|
||||||
|
{
|
||||||
|
User: "any",
|
||||||
|
IPs: []string{}, // any IP
|
||||||
|
Permissions: []UserPermission{
|
||||||
|
{Action: "publish"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
User: "api",
|
||||||
|
Pass: a.pass,
|
||||||
|
IPs: []string{}, // any IP
|
||||||
|
Permissions: []UserPermission{
|
||||||
|
{Action: "read"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
User: "api",
|
||||||
|
Pass: a.pass,
|
||||||
|
IPs: []string{}, // any IP
|
||||||
|
Permissions: []UserPermission{{Action: "api"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
RTMP: true,
|
||||||
|
RTMPEncryption: encryptionString,
|
||||||
|
RTMPAddress: ":1935",
|
||||||
|
RTMPSAddress: ":1936",
|
||||||
|
RTMPServerCert: "/etc/tls.crt", // TODO: custom certs
|
||||||
|
RTMPServerKey: "/etc/tls.key", // TODO: custom certs
|
||||||
|
API: true,
|
||||||
|
APIEncryption: true,
|
||||||
|
APIServerCert: "/etc/tls.crt",
|
||||||
|
APIServerKey: "/etc/tls.key",
|
||||||
|
Paths: map[string]Path{
|
||||||
|
string(a.streamKey): {Source: "publisher"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// C returns a channel that will receive the current state of the media server.
|
// C returns a channel that will receive the current state of the media server.
|
||||||
func (s *Actor) C() <-chan domain.Source {
|
func (s *Actor) C() <-chan domain.Source {
|
||||||
return s.stateC
|
return s.stateC
|
||||||
@ -329,7 +374,20 @@ func (s *Actor) handleContainerExit(err error) {
|
|||||||
|
|
||||||
// RTMPURL returns the RTMP URL for the media server, accessible from the host.
|
// RTMPURL returns the RTMP URL for the media server, accessible from the host.
|
||||||
func (s *Actor) RTMPURL() string {
|
func (s *Actor) RTMPURL() string {
|
||||||
return fmt.Sprintf("rtmp://%s:%d/%s", s.rtmpHost, s.rtmpAddr.Port, s.streamKey)
|
if s.rtmpAddr.IsZero() {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("rtmp://%s:%d/%s", s.host, s.rtmpAddr.Port, s.streamKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RTMPSURL returns the RTMPS URL for the media server, accessible from the host.
|
||||||
|
func (s *Actor) RTMPSURL() string {
|
||||||
|
if s.rtmpsAddr.IsZero() {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("rtmps://%s:%d/%s", s.host, s.rtmpsAddr.Port, s.streamKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RTMPInternalURL returns the RTMP URL for the media server, accessible from
|
// RTMPInternalURL returns the RTMP URL for the media server, accessible from
|
||||||
@ -367,3 +425,17 @@ func generatePassword() string {
|
|||||||
_, _ = rand.Read(p)
|
_, _ = rand.Read(p)
|
||||||
return fmt.Sprintf("%x", []byte(p))
|
return fmt.Sprintf("%x", []byte(p))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// toRTMPAddr builds a domain.NetAddr from an OptionalNetAddr, with default
|
||||||
|
// values set to RTMP default bind config if needed. If the OptionalNetAddr is
|
||||||
|
// not enabled, a zero value is returned.
|
||||||
|
func toRTMPAddr(a OptionalNetAddr, defaultPort int) domain.NetAddr {
|
||||||
|
if !a.Enabled {
|
||||||
|
return domain.NetAddr{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return domain.NetAddr{
|
||||||
|
IP: cmp.Or(a.IP, defaultRTMPIP),
|
||||||
|
Port: cmp.Or(a.Port, defaultPort),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -17,8 +17,12 @@ type Config struct {
|
|||||||
APIEncryption bool `yaml:"apiEncryption,omitempty"`
|
APIEncryption bool `yaml:"apiEncryption,omitempty"`
|
||||||
APIServerCert string `yaml:"apiServerCert,omitempty"`
|
APIServerCert string `yaml:"apiServerCert,omitempty"`
|
||||||
APIServerKey string `yaml:"apiServerKey,omitempty"`
|
APIServerKey string `yaml:"apiServerKey,omitempty"`
|
||||||
RTMP bool `yaml:"rtmp,omitempty"`
|
RTMP bool `yaml:"rtmp"`
|
||||||
|
RTMPEncryption string `yaml:"rtmpEncryption,omitempty"`
|
||||||
RTMPAddress string `yaml:"rtmpAddress,omitempty"`
|
RTMPAddress string `yaml:"rtmpAddress,omitempty"`
|
||||||
|
RTMPSAddress string `yaml:"rtmpsAddress,omitempty"`
|
||||||
|
RTMPServerCert string `yaml:"rtmpServerCert,omitempty"`
|
||||||
|
RTMPServerKey string `yaml:"rtmpServerKey,omitempty"`
|
||||||
HLS bool `yaml:"hls"`
|
HLS bool `yaml:"hls"`
|
||||||
RTSP bool `yaml:"rtsp"`
|
RTSP bool `yaml:"rtsp"`
|
||||||
WebRTC bool `yaml:"webrtc"`
|
WebRTC bool `yaml:"webrtc"`
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type sourceViews struct {
|
type sourceViews struct {
|
||||||
url *tview.TextView
|
|
||||||
status *tview.TextView
|
status *tview.TextView
|
||||||
tracks *tview.TextView
|
tracks *tview.TextView
|
||||||
health *tview.TextView
|
health *tview.TextView
|
||||||
@ -44,6 +43,7 @@ type UI struct {
|
|||||||
commandC chan Command
|
commandC chan Command
|
||||||
clipboardAvailable bool
|
clipboardAvailable bool
|
||||||
configFilePath string
|
configFilePath string
|
||||||
|
rtmpURL, rtmpsURL string
|
||||||
buildInfo domain.BuildInfo
|
buildInfo domain.BuildInfo
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
|
|
||||||
@ -57,6 +57,7 @@ type UI struct {
|
|||||||
sourceViews sourceViews
|
sourceViews sourceViews
|
||||||
destView *tview.Table
|
destView *tview.Table
|
||||||
noDestView *tview.TextView
|
noDestView *tview.TextView
|
||||||
|
aboutView *tview.Flex
|
||||||
pullProgressModal *tview.Modal
|
pullProgressModal *tview.Modal
|
||||||
|
|
||||||
// other mutable state
|
// other mutable state
|
||||||
@ -127,8 +128,8 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
|||||||
sourceView := tview.NewFlex()
|
sourceView := tview.NewFlex()
|
||||||
sourceView.SetDirection(tview.FlexColumn)
|
sourceView.SetDirection(tview.FlexColumn)
|
||||||
sourceView.SetBorder(true)
|
sourceView.SetBorder(true)
|
||||||
sourceView.SetTitle("Source RTMP server")
|
sourceView.SetTitle("Source")
|
||||||
sidebar.AddItem(sourceView, 9, 0, false)
|
sidebar.AddItem(sourceView, 8, 0, false)
|
||||||
|
|
||||||
leftCol := tview.NewFlex()
|
leftCol := tview.NewFlex()
|
||||||
leftCol.SetDirection(tview.FlexRow)
|
leftCol.SetDirection(tview.FlexRow)
|
||||||
@ -137,11 +138,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
|||||||
sourceView.AddItem(leftCol, 9, 0, false)
|
sourceView.AddItem(leftCol, 9, 0, false)
|
||||||
sourceView.AddItem(rightCol, 0, 1, false)
|
sourceView.AddItem(rightCol, 0, 1, false)
|
||||||
|
|
||||||
urlHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerURL)
|
|
||||||
leftCol.AddItem(urlHeaderTextView, 1, 0, false)
|
|
||||||
urlTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash)
|
|
||||||
rightCol.AddItem(urlTextView, 1, 0, false)
|
|
||||||
|
|
||||||
statusHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerStatus)
|
statusHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerStatus)
|
||||||
leftCol.AddItem(statusHeaderTextView, 1, 0, false)
|
leftCol.AddItem(statusHeaderTextView, 1, 0, false)
|
||||||
statusTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash)
|
statusTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash)
|
||||||
@ -176,13 +172,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
|||||||
aboutView.SetDirection(tview.FlexRow)
|
aboutView.SetDirection(tview.FlexRow)
|
||||||
aboutView.SetBorder(true)
|
aboutView.SetBorder(true)
|
||||||
aboutView.SetTitle("Actions")
|
aboutView.SetTitle("Actions")
|
||||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]a[-] Add destination"), 1, 0, false)
|
|
||||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Del[-] Remove destination"), 1, 0, false)
|
|
||||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Space[-] Start/stop destination"), 1, 0, false)
|
|
||||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText(""), 1, 0, false)
|
|
||||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]u[-] Copy source RTMP URL"), 1, 0, false)
|
|
||||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false)
|
|
||||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
|
|
||||||
|
|
||||||
sidebar.AddItem(aboutView, 0, 1, false)
|
sidebar.AddItem(aboutView, 0, 1, false)
|
||||||
|
|
||||||
@ -232,7 +221,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
|||||||
pages: pages,
|
pages: pages,
|
||||||
container: container,
|
container: container,
|
||||||
sourceViews: sourceViews{
|
sourceViews: sourceViews{
|
||||||
url: urlTextView,
|
|
||||||
status: statusTextView,
|
status: statusTextView,
|
||||||
tracks: tracksTextView,
|
tracks: tracksTextView,
|
||||||
health: healthTextView,
|
health: healthTextView,
|
||||||
@ -242,6 +230,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
|||||||
},
|
},
|
||||||
destView: destView,
|
destView: destView,
|
||||||
noDestView: noDestView,
|
noDestView: noDestView,
|
||||||
|
aboutView: aboutView,
|
||||||
pullProgressModal: pullProgressModal,
|
pullProgressModal: pullProgressModal,
|
||||||
urlsToStartState: make(map[string]startState),
|
urlsToStartState: make(map[string]startState),
|
||||||
}
|
}
|
||||||
@ -254,6 +243,30 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
|||||||
return ui, nil
|
return ui, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ui *UI) renderAboutView() {
|
||||||
|
ui.aboutView.Clear()
|
||||||
|
|
||||||
|
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]a[-] Add destination"), 1, 0, false)
|
||||||
|
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Del[-] Remove destination"), 1, 0, false)
|
||||||
|
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Space[-] Start/stop destination"), 1, 0, false)
|
||||||
|
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText(""), 1, 0, false)
|
||||||
|
|
||||||
|
i := 1
|
||||||
|
if ui.rtmpURL != "" {
|
||||||
|
rtmpURLView := tview.NewTextView().SetDynamicColors(true).SetText(fmt.Sprintf("[grey]F%d[-] Copy source RTMP URL", i))
|
||||||
|
ui.aboutView.AddItem(rtmpURLView, 1, 0, false)
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
|
||||||
|
if ui.rtmpsURL != "" {
|
||||||
|
rtmpsURLView := tview.NewTextView().SetDynamicColors(true).SetText(fmt.Sprintf("[grey]F%d[-] Copy source RTMPS URL", i))
|
||||||
|
ui.aboutView.AddItem(rtmpsURLView, 1, 0, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false)
|
||||||
|
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
|
||||||
|
}
|
||||||
|
|
||||||
// C returns a channel that receives commands from the user interface.
|
// C returns a channel that receives commands from the user interface.
|
||||||
func (ui *UI) C() <-chan Command {
|
func (ui *UI) C() <-chan Command {
|
||||||
return ui.commandC
|
return ui.commandC
|
||||||
@ -283,6 +296,17 @@ func (ui *UI) run(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetRTMPURLs sets the RTMP and RTMPS URLs for the user interface, which are
|
||||||
|
// unavailable when the UI is first created.
|
||||||
|
func (ui *UI) SetRTMPURLs(rtmpURL, rtmpsURL string) {
|
||||||
|
ui.mu.Lock()
|
||||||
|
ui.rtmpURL = rtmpURL
|
||||||
|
ui.rtmpsURL = rtmpsURL
|
||||||
|
ui.mu.Unlock()
|
||||||
|
|
||||||
|
ui.renderAboutView()
|
||||||
|
}
|
||||||
|
|
||||||
func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
||||||
// Special case: handle CTRL-C even when a modal is visible.
|
// Special case: handle CTRL-C even when a modal is visible.
|
||||||
if event.Key() == tcell.KeyCtrlC {
|
if event.Key() == tcell.KeyCtrlC {
|
||||||
@ -309,8 +333,6 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
|||||||
return nil
|
return nil
|
||||||
case ' ':
|
case ' ':
|
||||||
ui.toggleDestination()
|
ui.toggleDestination()
|
||||||
case 'u', 'U':
|
|
||||||
ui.copySourceURLToClipboard(ui.clipboardAvailable)
|
|
||||||
case 'c', 'C':
|
case 'c', 'C':
|
||||||
ui.copyConfigFilePathToClipboard(ui.clipboardAvailable, ui.configFilePath)
|
ui.copyConfigFilePathToClipboard(ui.clipboardAvailable, ui.configFilePath)
|
||||||
case '?':
|
case '?':
|
||||||
@ -318,6 +340,8 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
|||||||
case 'k': // tview vim bindings
|
case 'k': // tview vim bindings
|
||||||
handleKeyUp()
|
handleKeyUp()
|
||||||
}
|
}
|
||||||
|
case tcell.KeyF1, tcell.KeyF2:
|
||||||
|
ui.fkeyHandler(event.Key())
|
||||||
case tcell.KeyDelete, tcell.KeyBackspace, tcell.KeyBackspace2:
|
case tcell.KeyDelete, tcell.KeyBackspace, tcell.KeyBackspace2:
|
||||||
ui.removeDestination()
|
ui.removeDestination()
|
||||||
return nil
|
return nil
|
||||||
@ -328,11 +352,34 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
|||||||
return event
|
return event
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ui *UI) fkeyHandler(key tcell.Key) {
|
||||||
|
var urls []string
|
||||||
|
if ui.rtmpURL != "" {
|
||||||
|
urls = append(urls, ui.rtmpURL)
|
||||||
|
}
|
||||||
|
if ui.rtmpsURL != "" {
|
||||||
|
urls = append(urls, ui.rtmpsURL)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch key {
|
||||||
|
case tcell.KeyF1:
|
||||||
|
if len(urls) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ui.copySourceURLToClipboard(urls[0])
|
||||||
|
case tcell.KeyF2:
|
||||||
|
if len(urls) < 2 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ui.copySourceURLToClipboard(urls[1])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (ui *UI) ShowSourceNotLiveModal() {
|
func (ui *UI) ShowSourceNotLiveModal() {
|
||||||
ui.app.QueueUpdateDraw(func() {
|
ui.app.QueueUpdateDraw(func() {
|
||||||
ui.showModal(
|
ui.showModal(
|
||||||
pageNameModalStartupCheck,
|
pageNameModalNotLive,
|
||||||
fmt.Sprintf("Waiting for stream.\nStart streaming to the source URL then try again:\n\n%s", ui.sourceViews.url.GetText(true)),
|
"Waiting for stream.\n\nStart streaming to a source URL then try again.",
|
||||||
[]string{"Ok"},
|
[]string{"Ok"},
|
||||||
false,
|
false,
|
||||||
nil,
|
nil,
|
||||||
@ -519,6 +566,7 @@ func (ui *UI) updateProgressModal(container domain.Container) {
|
|||||||
const (
|
const (
|
||||||
pageNameMain = "main"
|
pageNameMain = "main"
|
||||||
pageNameAddDestination = "add-destination"
|
pageNameAddDestination = "add-destination"
|
||||||
|
pageNameViewURLs = "view-urls"
|
||||||
pageNameConfigUpdateFailed = "modal-config-update-failed"
|
pageNameConfigUpdateFailed = "modal-config-update-failed"
|
||||||
pageNameNoDestinations = "no-destinations"
|
pageNameNoDestinations = "no-destinations"
|
||||||
pageNameModalAbout = "modal-about"
|
pageNameModalAbout = "modal-about"
|
||||||
@ -530,6 +578,7 @@ const (
|
|||||||
pageNameModalRemoveDestination = "modal-remove-destination"
|
pageNameModalRemoveDestination = "modal-remove-destination"
|
||||||
pageNameModalSourceError = "modal-source-error"
|
pageNameModalSourceError = "modal-source-error"
|
||||||
pageNameModalStartupCheck = "modal-startup-check"
|
pageNameModalStartupCheck = "modal-startup-check"
|
||||||
|
pageNameModalNotLive = "modal-not-live"
|
||||||
)
|
)
|
||||||
|
|
||||||
// modalVisible returns true if any modal, including the add destination form,
|
// modalVisible returns true if any modal, including the add destination form,
|
||||||
@ -696,8 +745,6 @@ func (ui *UI) redrawFromState(state domain.AppState) {
|
|||||||
SetSelectable(false)
|
SetSelectable(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
ui.sourceViews.url.SetText(cmp.Or(state.Source.RTMPURL, dash))
|
|
||||||
|
|
||||||
tracks := dash
|
tracks := dash
|
||||||
if state.Source.Live && len(state.Source.Tracks) > 0 {
|
if state.Source.Live && len(state.Source.Tracks) > 0 {
|
||||||
tracks = strings.Join(state.Source.Tracks, ", ")
|
tracks = strings.Join(state.Source.Tracks, ", ")
|
||||||
@ -971,13 +1018,12 @@ func (ui *UI) toggleDestination() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ui *UI) copySourceURLToClipboard(clipboardAvailable bool) {
|
func (ui *UI) copySourceURLToClipboard(url string) {
|
||||||
var text string
|
var text string
|
||||||
|
|
||||||
url := ui.sourceViews.url.GetText(true)
|
if ui.clipboardAvailable {
|
||||||
if clipboardAvailable {
|
|
||||||
clipboard.Write(clipboard.FmtText, []byte(url))
|
clipboard.Write(clipboard.FmtText, []byte(url))
|
||||||
text = "Source URL copied to clipboard:\n\n" + url
|
text = "URL copied to clipboard:\n\n" + url
|
||||||
} else {
|
} else {
|
||||||
text = "Copy to clipboard not available:\n\n" + url
|
text = "Copy to clipboard not available:\n\n" + url
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user