Compare commits
17 Commits
build/perm
...
main
Author | SHA1 | Date | |
---|---|---|---|
|
add511e3dd | ||
|
7afa84505e | ||
|
4a863a3212 | ||
|
98d93ad286 | ||
|
5f026be769 | ||
|
d35dedb15b | ||
|
e49bbb6800 | ||
|
c022c18a7f | ||
|
b147da6d9b | ||
|
e113d55044 | ||
|
0df42511ce | ||
|
f853a5cced | ||
|
781d535d38 | ||
|
f3a5b802b8 | ||
|
6646402273 | ||
|
f6c87c4568 | ||
|
2f980acbb3 |
@ -87,3 +87,4 @@ jobs:
|
|||||||
args: release --clean
|
args: release --clean
|
||||||
env:
|
env:
|
||||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||||
|
HOMEBREW_TOKEN: ${{ secrets.HOMEBREW_TAP_GITHUB_TOKEN }}
|
89
.github/workflows/codeql.yml
vendored
Normal file
89
.github/workflows/codeql.yml
vendored
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
name: ci-scan
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [ "main" ]
|
||||||
|
pull_request:
|
||||||
|
branches: [ "main" ]
|
||||||
|
schedule:
|
||||||
|
- cron: '36 23 * * 3'
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
analyze:
|
||||||
|
name: Analyze (${{ matrix.language }})
|
||||||
|
# Runner size impacts CodeQL analysis time. To learn more, please see:
|
||||||
|
# - https://gh.io/recommended-hardware-resources-for-running-codeql
|
||||||
|
# - https://gh.io/supported-runners-and-hardware-resources
|
||||||
|
# - https://gh.io/using-larger-runners (GitHub.com only)
|
||||||
|
# Consider using larger runners or machines with greater resources for possible analysis time improvements.
|
||||||
|
runs-on: ${{ (matrix.language == 'swift' && 'macos-latest') || 'ubuntu-latest' }}
|
||||||
|
permissions:
|
||||||
|
# required for all workflows
|
||||||
|
security-events: write
|
||||||
|
|
||||||
|
# required to fetch internal or private CodeQL packs
|
||||||
|
packages: read
|
||||||
|
|
||||||
|
# only required for workflows in private repositories
|
||||||
|
actions: read
|
||||||
|
contents: read
|
||||||
|
|
||||||
|
strategy:
|
||||||
|
fail-fast: false
|
||||||
|
matrix:
|
||||||
|
include:
|
||||||
|
- language: actions
|
||||||
|
build-mode: none
|
||||||
|
- language: go
|
||||||
|
build-mode: autobuild
|
||||||
|
# CodeQL supports the following values keywords for 'language': 'actions', 'c-cpp', 'csharp', 'go', 'java-kotlin', 'javascript-typescript', 'python', 'ruby', 'swift'
|
||||||
|
# Use `c-cpp` to analyze code written in C, C++ or both
|
||||||
|
# Use 'java-kotlin' to analyze code written in Java, Kotlin or both
|
||||||
|
# Use 'javascript-typescript' to analyze code written in JavaScript, TypeScript or both
|
||||||
|
# To learn more about changing the languages that are analyzed or customizing the build mode for your analysis,
|
||||||
|
# see https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/customizing-your-advanced-setup-for-code-scanning.
|
||||||
|
# If you are analyzing a compiled language, you can modify the 'build-mode' for that language to customize how
|
||||||
|
# your codebase is analyzed, see https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/codeql-code-scanning-for-compiled-languages
|
||||||
|
steps:
|
||||||
|
- name: Checkout repository
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
# Add any setup steps before running the `github/codeql-action/init` action.
|
||||||
|
# This includes steps like installing compilers or runtimes (`actions/setup-node`
|
||||||
|
# or others). This is typically only required for manual builds.
|
||||||
|
# - name: Setup runtime (example)
|
||||||
|
# uses: actions/setup-example@v1
|
||||||
|
|
||||||
|
# Initializes the CodeQL tools for scanning.
|
||||||
|
- name: Initialize CodeQL
|
||||||
|
uses: github/codeql-action/init@v3
|
||||||
|
with:
|
||||||
|
languages: ${{ matrix.language }}
|
||||||
|
build-mode: ${{ matrix.build-mode }}
|
||||||
|
# If you wish to specify custom queries, you can do so here or in a config file.
|
||||||
|
# By default, queries listed here will override any specified in a config file.
|
||||||
|
# Prefix the list here with "+" to use these queries and those in the config file.
|
||||||
|
|
||||||
|
# For more details on CodeQL's query packs, refer to: https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
|
||||||
|
# queries: security-extended,security-and-quality
|
||||||
|
|
||||||
|
# If the analyze step fails for one of the languages you are analyzing with
|
||||||
|
# "We were unable to automatically build your code", modify the matrix above
|
||||||
|
# to set the build mode to "manual" for that language. Then modify this step
|
||||||
|
# to build your code.
|
||||||
|
# ℹ️ Command-line programs to run using the OS shell.
|
||||||
|
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun
|
||||||
|
- if: matrix.build-mode == 'manual'
|
||||||
|
shell: bash
|
||||||
|
run: |
|
||||||
|
echo 'If you are using a "manual" build mode for one or more of the' \
|
||||||
|
'languages you are analyzing, replace this with the commands to build' \
|
||||||
|
'your code, for example:'
|
||||||
|
echo ' make bootstrap'
|
||||||
|
echo ' make release'
|
||||||
|
exit 1
|
||||||
|
|
||||||
|
- name: Perform CodeQL Analysis
|
||||||
|
uses: github/codeql-action/analyze@v3
|
||||||
|
with:
|
||||||
|
category: "/language:${{matrix.language}}"
|
@ -33,6 +33,7 @@ brews:
|
|||||||
repository:
|
repository:
|
||||||
owner: rfwatson
|
owner: rfwatson
|
||||||
name: homebrew-octoplex
|
name: homebrew-octoplex
|
||||||
|
token: "{{ .Env.HOMEBREW_TOKEN }}"
|
||||||
install: |
|
install: |
|
||||||
bin.install "octoplex"
|
bin.install "octoplex"
|
||||||
test: |
|
test: |
|
||||||
|
43
CONTRIBUTING.md
Normal file
43
CONTRIBUTING.md
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
# Contributing
|
||||||
|
|
||||||
|
Thanks for contributing to Octoplex!
|
||||||
|
|
||||||
|
## Development
|
||||||
|
|
||||||
|
### Mise
|
||||||
|
|
||||||
|
Octoplex uses [mise](https://mise.jdx.dev/installing-mise.html) as a task
|
||||||
|
runner and environment management tool.
|
||||||
|
|
||||||
|
Once installed, you can run common development tasks easily:
|
||||||
|
|
||||||
|
Command|Shortcut|Description
|
||||||
|
---|---|---
|
||||||
|
`mise run test`|`mise run t`|Run unit tests
|
||||||
|
`mise run test_integration`|`mise run ti`|Run integration tests
|
||||||
|
`mise run lint`|`mise run l`|Run linter
|
||||||
|
`mise run format`|`mise run f`|Run formatter
|
||||||
|
`mise run generate_mocks`|`mise run m`|Re-generate mocks
|
||||||
|
|
||||||
|
### Tests
|
||||||
|
|
||||||
|
#### Integration tests
|
||||||
|
|
||||||
|
The integration tests (mostly in `/internal/app/integration_test.go`) attempt
|
||||||
|
to exercise the entire app, including launching containers and rendering the
|
||||||
|
terminal output.
|
||||||
|
|
||||||
|
Sometimes they can be flaky. Always ensure there are no stale Docker containers
|
||||||
|
present from previous runs, and that nothing is listening or attempting to
|
||||||
|
broadcast to localhost:1935 or localhost:1936.
|
||||||
|
|
||||||
|
## Opening a pull request
|
||||||
|
|
||||||
|
Pull requests are welcome, but please propose significant changes in a
|
||||||
|
[discussion](https://github.com/rfwatson/octoplex/discussions) first.
|
||||||
|
|
||||||
|
1. Fork the repo
|
||||||
|
2. Make your changes, including test coverage
|
||||||
|
3. Push the changes to a branch
|
||||||
|
4. Ensure the branch is passing
|
||||||
|
5. Open a pull request
|
26
README.md
26
README.md
@ -1,12 +1,13 @@
|
|||||||
# Octoplex :octopus:
|
# Octoplex :octopus:
|
||||||
|
|
||||||

|

|
||||||
|

|
||||||

|

|
||||||
[](https://www.gnu.org/licenses/agpl-3.0)
|
[](https://www.gnu.org/licenses/agpl-3.0)
|
||||||
|
|
||||||
Octoplex is a live video restreamer for the terminal.
|
Octoplex is a live video restreamer for the terminal.
|
||||||
|
|
||||||
* Restream RTMP to unlimited destinations
|
* Restream RTMP/RTMPS to unlimited destinations
|
||||||
* Broadcast using OBS and other standard tools
|
* Broadcast using OBS and other standard tools
|
||||||
* Add and remove destinations while streaming
|
* Add and remove destinations while streaming
|
||||||
* Automatic reconnections
|
* Automatic reconnections
|
||||||
@ -96,9 +97,20 @@ logfile:
|
|||||||
enabled: true # defaults to false
|
enabled: true # defaults to false
|
||||||
path: /path/to/logfile # defaults to $XDG_STATE_HOME/octoplex/octoplex.log
|
path: /path/to/logfile # defaults to $XDG_STATE_HOME/octoplex/octoplex.log
|
||||||
sources:
|
sources:
|
||||||
rtmp:
|
mediaServer:
|
||||||
enabled: true # must be true
|
|
||||||
streamKey: live # defaults to "live"
|
streamKey: live # defaults to "live"
|
||||||
|
host: rtmp.example.com # defaults to "localhost"
|
||||||
|
tls: # optional. If RTMPS is enabled, defaults to a
|
||||||
|
cert: /etc/mycert.pem # self-signed keypair corresponding to the host
|
||||||
|
key: /etc/mykey.pem # key.
|
||||||
|
rtmp:
|
||||||
|
enabled: true # defaults to false
|
||||||
|
ip: 127.0.0.1 # defaults to 127.0.0.1
|
||||||
|
port: 1935 # defaults to 1935
|
||||||
|
rtmps:
|
||||||
|
enabled: true # defaults to false
|
||||||
|
ip: 0.0.0.0 # defaults to 127.0.0.1
|
||||||
|
port: 1936 # defaults to 1936
|
||||||
destinations:
|
destinations:
|
||||||
- name: YouTube # Destination name, used only for display
|
- name: YouTube # Destination name, used only for display
|
||||||
url: rtmp://rtmp.youtube.com/12345 # Destination URL with stream key
|
url: rtmp://rtmp.youtube.com/12345 # Destination URL with stream key
|
||||||
@ -107,9 +119,13 @@ destinations:
|
|||||||
# other destinations here
|
# other destinations here
|
||||||
```
|
```
|
||||||
|
|
||||||
:warning: It is also possible to add and remove destinations directly from the
|
:information_source: It is also possible to add and remove destinations directly from the
|
||||||
terminal user interface.
|
terminal user interface.
|
||||||
|
|
||||||
|
:warning: `sources.mediaServer.rtmp.ip` must be set to a valid IP address if
|
||||||
|
you want to accept connections from other hosts. Leave it blank to bind only to
|
||||||
|
localhost (`127.0.0.1`) or use `0.0.0.0` to bind to all network interfaces.
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
|
||||||
### Bug reports
|
### Bug reports
|
||||||
|
11
SECURITY.md
Normal file
11
SECURITY.md
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
# Security
|
||||||
|
|
||||||
|
## Supported versions
|
||||||
|
|
||||||
|
Octoplex is currently alpha software. Security updates will be targeted to the
|
||||||
|
`main` branch.
|
||||||
|
|
||||||
|
## Reporting a vulnerability
|
||||||
|
|
||||||
|
Please report any vulnerability privately through GitHub:
|
||||||
|
https://github.com/rfwatson/octoplex/security
|
22
go.mod
22
go.mod
@ -3,12 +3,12 @@ module git.netflux.io/rob/octoplex
|
|||||||
go 1.24.0
|
go 1.24.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/docker/docker v28.0.1+incompatible
|
github.com/docker/docker v28.0.4+incompatible
|
||||||
github.com/docker/go-connections v0.5.0
|
github.com/docker/go-connections v0.5.0
|
||||||
github.com/gdamore/tcell/v2 v2.8.1
|
github.com/gdamore/tcell/v2 v2.8.1
|
||||||
github.com/google/go-cmp v0.7.0
|
github.com/google/go-cmp v0.7.0
|
||||||
github.com/opencontainers/image-spec v1.1.1
|
github.com/opencontainers/image-spec v1.1.1
|
||||||
github.com/rivo/tview v0.0.0-20241227133733-17b7edb88c57
|
github.com/rivo/tview v0.0.0-20250330220935-949945f8d922
|
||||||
github.com/stretchr/testify v1.10.0
|
github.com/stretchr/testify v1.10.0
|
||||||
github.com/testcontainers/testcontainers-go v0.35.0
|
github.com/testcontainers/testcontainers-go v0.35.0
|
||||||
golang.design/x/clipboard v0.7.0
|
golang.design/x/clipboard v0.7.0
|
||||||
@ -88,18 +88,18 @@ require (
|
|||||||
go.opentelemetry.io/otel/metric v1.35.0 // indirect
|
go.opentelemetry.io/otel/metric v1.35.0 // indirect
|
||||||
go.opentelemetry.io/otel/trace v1.35.0 // indirect
|
go.opentelemetry.io/otel/trace v1.35.0 // indirect
|
||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
golang.org/x/crypto v0.32.0 // indirect
|
golang.org/x/crypto v0.37.0 // indirect
|
||||||
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac // indirect
|
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac // indirect
|
||||||
golang.org/x/exp/shiny v0.0.0-20250305212735-054e65f0b394 // indirect
|
golang.org/x/exp/shiny v0.0.0-20250408133849-7e4ce0ab07d0 // indirect
|
||||||
golang.org/x/image v0.25.0 // indirect
|
golang.org/x/image v0.26.0 // indirect
|
||||||
golang.org/x/mobile v0.0.0-20250305212854-3a7bc9f8a4de // indirect
|
golang.org/x/mobile v0.0.0-20250408133729-978277e7eaf7 // indirect
|
||||||
golang.org/x/mod v0.24.0 // indirect
|
golang.org/x/mod v0.24.0 // indirect
|
||||||
golang.org/x/sync v0.12.0 // indirect
|
golang.org/x/sync v0.13.0 // indirect
|
||||||
golang.org/x/sys v0.31.0 // indirect
|
golang.org/x/sys v0.32.0 // indirect
|
||||||
golang.org/x/term v0.30.0 // indirect
|
golang.org/x/term v0.31.0 // indirect
|
||||||
golang.org/x/text v0.23.0 // indirect
|
golang.org/x/text v0.24.0 // indirect
|
||||||
golang.org/x/time v0.9.0 // indirect
|
golang.org/x/time v0.9.0 // indirect
|
||||||
golang.org/x/tools v0.31.0 // indirect
|
golang.org/x/tools v0.32.0 // indirect
|
||||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
|
48
go.sum
48
go.sum
@ -26,8 +26,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
|
|||||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
|
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
|
||||||
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
|
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
|
||||||
github.com/docker/docker v28.0.1+incompatible h1:FCHjSRdXhNRFjlHMTv4jUNlIBbTeRjrWfeFuJp7jpo0=
|
github.com/docker/docker v28.0.4+incompatible h1:JNNkBctYKurkw6FrHfKqY0nKIDf5nrbxjVBtS+cdcok=
|
||||||
github.com/docker/docker v28.0.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
|
github.com/docker/docker v28.0.4+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
|
||||||
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
|
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
|
||||||
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
|
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
|
||||||
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
||||||
@ -125,8 +125,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
|
|||||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
|
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
|
||||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
|
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
|
||||||
github.com/rivo/tview v0.0.0-20241227133733-17b7edb88c57 h1:LmsF7Fk5jyEDhJk0fYIqdWNuTxSyid2W42A0L2YWjGE=
|
github.com/rivo/tview v0.0.0-20250330220935-949945f8d922 h1:SMyqkaRfpE8ZQUSRTZKO3uN84xov++OGa+e3NCksaQw=
|
||||||
github.com/rivo/tview v0.0.0-20241227133733-17b7edb88c57/go.mod h1:02iFIz7K/A9jGCvrizLPvoqr4cEIx7q54RH5Qudkrss=
|
github.com/rivo/tview v0.0.0-20250330220935-949945f8d922/go.mod h1:02iFIz7K/A9jGCvrizLPvoqr4cEIx7q54RH5Qudkrss=
|
||||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||||
github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
|
github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
|
||||||
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
|
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
|
||||||
@ -219,16 +219,16 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
|
|||||||
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
|
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
|
||||||
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
|
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
|
||||||
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
|
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
|
||||||
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
|
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
|
||||||
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
|
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
|
||||||
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac h1:l5+whBCLH3iH2ZNHYLbAe58bo7yrN4mVcnkHDYz5vvs=
|
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac h1:l5+whBCLH3iH2ZNHYLbAe58bo7yrN4mVcnkHDYz5vvs=
|
||||||
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac/go.mod h1:hH+7mtFmImwwcMvScyxUhjuVHR3HGaDPMn9rMSUUbxo=
|
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac/go.mod h1:hH+7mtFmImwwcMvScyxUhjuVHR3HGaDPMn9rMSUUbxo=
|
||||||
golang.org/x/exp/shiny v0.0.0-20250305212735-054e65f0b394 h1:bFYqOIMdeiCEdzPJkLiOoMDzW/v3tjW4AA/RmUZYsL8=
|
golang.org/x/exp/shiny v0.0.0-20250408133849-7e4ce0ab07d0 h1:tMSqXTK+AQdW3LpCbfatHSRPHeW6+2WuxaVQuHftn80=
|
||||||
golang.org/x/exp/shiny v0.0.0-20250305212735-054e65f0b394/go.mod h1:ygj7T6vSGhhm/9yTpOQQNvuAUFziTH7RUiH74EoE2C8=
|
golang.org/x/exp/shiny v0.0.0-20250408133849-7e4ce0ab07d0/go.mod h1:ygj7T6vSGhhm/9yTpOQQNvuAUFziTH7RUiH74EoE2C8=
|
||||||
golang.org/x/image v0.25.0 h1:Y6uW6rH1y5y/LK1J8BPWZtr6yZ7hrsy6hFrXjgsc2fQ=
|
golang.org/x/image v0.26.0 h1:4XjIFEZWQmCZi6Wv8BoxsDhRU3RVnLX04dToTDAEPlY=
|
||||||
golang.org/x/image v0.25.0/go.mod h1:tCAmOEGthTtkalusGp1g3xa2gke8J6c2N565dTyl9Rs=
|
golang.org/x/image v0.26.0/go.mod h1:lcxbMFAovzpnJxzXS3nyL83K27tmqtKzIJpctK8YO5c=
|
||||||
golang.org/x/mobile v0.0.0-20250305212854-3a7bc9f8a4de h1:WuckfUoaRGJfaQTPZvlmcaQwg4Xj9oS2cvvh3dUqpDo=
|
golang.org/x/mobile v0.0.0-20250408133729-978277e7eaf7 h1:8MGTx39304caZ/OMsjPfuxUoDGI2tRas92F5x97tIYc=
|
||||||
golang.org/x/mobile v0.0.0-20250305212854-3a7bc9f8a4de/go.mod h1:/IZuixag1ELW37+FftdmIt59/3esqpAWM/QqWtf7HUI=
|
golang.org/x/mobile v0.0.0-20250408133729-978277e7eaf7/go.mod h1:ftACcHgQ7vaOnQbHOHvXt9Y6bEPHrs5Ovk67ClwrPJA=
|
||||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
||||||
@ -249,8 +249,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
|
|||||||
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
|
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
|
||||||
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
|
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
|
||||||
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
|
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
|
||||||
golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c=
|
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
|
||||||
golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
|
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
|
||||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
@ -260,8 +260,8 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
|
|||||||
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||||
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||||
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||||
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
|
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
|
||||||
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
@ -283,8 +283,8 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
|||||||
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
|
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
|
||||||
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||||
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
|
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
|
||||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||||
@ -294,8 +294,8 @@ golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
|
|||||||
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
|
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
|
||||||
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
|
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
|
||||||
golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek=
|
golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek=
|
||||||
golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y=
|
golang.org/x/term v0.31.0 h1:erwDkOK1Msy6offm1mOgvspSkslFnIGsFnxOKoufg3o=
|
||||||
golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g=
|
golang.org/x/term v0.31.0/go.mod h1:R4BeIy7D95HzImkxGkTW1UQTtP54tio2RyHz7PwK0aw=
|
||||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||||
@ -305,8 +305,8 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
|||||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||||
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||||
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
|
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
|
||||||
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
|
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
|
||||||
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
|
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
|
||||||
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
|
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
|
||||||
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
@ -317,8 +317,8 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc
|
|||||||
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
|
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
|
||||||
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
|
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
|
||||||
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
|
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
|
||||||
golang.org/x/tools v0.31.0 h1:0EedkvKDbh+qistFTd0Bcwe/YLh4vHwWEkiI0toFIBU=
|
golang.org/x/tools v0.32.0 h1:Q7N1vhpkQv7ybVzLFtTjvQya2ewbwNDZzUgfXGqtMWU=
|
||||||
golang.org/x/tools v0.31.0/go.mod h1:naFTU+Cev749tSJRXJlna0T3WxKvb1kWEx15xA4SdmQ=
|
golang.org/x/tools v0.32.0/go.mod h1:ZxrU41P/wAbZD8EDa6dDCa6XfpkhJ7HFMjHJXfBDu8s=
|
||||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
@ -38,9 +38,9 @@ func Run(ctx context.Context, params RunParams) error {
|
|||||||
state := new(domain.AppState)
|
state := new(domain.AppState)
|
||||||
applyConfig(cfg, state)
|
applyConfig(cfg, state)
|
||||||
|
|
||||||
// While RTMP is the only source, it doesn't make sense to disable it.
|
// Ensure there is at least one active source.
|
||||||
if !cfg.Sources.RTMP.Enabled {
|
if !cfg.Sources.MediaServer.RTMP.Enabled && !cfg.Sources.MediaServer.RTMPS.Enabled {
|
||||||
return errors.New("config: sources.rtmp.enabled must be set to true")
|
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
|
||||||
}
|
}
|
||||||
|
|
||||||
logger := params.Logger
|
logger := params.Logger
|
||||||
@ -88,8 +88,19 @@ func Run(ctx context.Context, params RunParams) error {
|
|||||||
updateUI := func() { ui.SetState(*state) }
|
updateUI := func() { ui.SetState(*state) }
|
||||||
updateUI()
|
updateUI()
|
||||||
|
|
||||||
|
var tlsCertPath, tlsKeyPath string
|
||||||
|
if cfg.Sources.MediaServer.TLS != nil {
|
||||||
|
tlsCertPath = cfg.Sources.MediaServer.TLS.CertPath
|
||||||
|
tlsKeyPath = cfg.Sources.MediaServer.TLS.KeyPath
|
||||||
|
}
|
||||||
|
|
||||||
srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{
|
srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{
|
||||||
StreamKey: mediaserver.StreamKey(cfg.Sources.RTMP.StreamKey),
|
RTMPAddr: buildNetAddr(cfg.Sources.MediaServer.RTMP),
|
||||||
|
RTMPSAddr: buildNetAddr(cfg.Sources.MediaServer.RTMPS),
|
||||||
|
Host: cfg.Sources.MediaServer.Host,
|
||||||
|
TLSCertPath: tlsCertPath,
|
||||||
|
TLSKeyPath: tlsKeyPath,
|
||||||
|
StreamKey: mediaserver.StreamKey(cfg.Sources.MediaServer.StreamKey),
|
||||||
ContainerClient: containerClient,
|
ContainerClient: containerClient,
|
||||||
Logger: logger.With("component", "mediaserver"),
|
Logger: logger.With("component", "mediaserver"),
|
||||||
})
|
})
|
||||||
@ -102,6 +113,10 @@ func Run(ctx context.Context, params RunParams) error {
|
|||||||
}
|
}
|
||||||
defer srv.Close()
|
defer srv.Close()
|
||||||
|
|
||||||
|
// Set the RTMP and RTMPS URLs in the UI, which are only known after the
|
||||||
|
// MediaServer is available.
|
||||||
|
ui.SetRTMPURLs(srv.RTMPURL(), srv.RTMPSURL())
|
||||||
|
|
||||||
repl := replicator.StartActor(ctx, replicator.StartActorParams{
|
repl := replicator.StartActor(ctx, replicator.StartActorParams{
|
||||||
SourceURL: srv.RTMPInternalURL(),
|
SourceURL: srv.RTMPInternalURL(),
|
||||||
ContainerClient: containerClient,
|
ContainerClient: containerClient,
|
||||||
@ -140,7 +155,7 @@ func Run(ctx context.Context, params RunParams) error {
|
|||||||
|
|
||||||
logger.Debug("Command received", "cmd", cmd.Name())
|
logger.Debug("Command received", "cmd", cmd.Name())
|
||||||
switch c := cmd.(type) {
|
switch c := cmd.(type) {
|
||||||
case terminal.CommandAddDestination:
|
case domain.CommandAddDestination:
|
||||||
newCfg := cfg
|
newCfg := cfg
|
||||||
newCfg.Destinations = append(newCfg.Destinations, config.Destination{
|
newCfg.Destinations = append(newCfg.Destinations, config.Destination{
|
||||||
Name: c.DestinationName,
|
Name: c.DestinationName,
|
||||||
@ -154,7 +169,7 @@ func Run(ctx context.Context, params RunParams) error {
|
|||||||
cfg = newCfg
|
cfg = newCfg
|
||||||
handleConfigUpdate(cfg, state, ui)
|
handleConfigUpdate(cfg, state, ui)
|
||||||
ui.DestinationAdded()
|
ui.DestinationAdded()
|
||||||
case terminal.CommandRemoveDestination:
|
case domain.CommandRemoveDestination:
|
||||||
repl.StopDestination(c.URL) // no-op if not live
|
repl.StopDestination(c.URL) // no-op if not live
|
||||||
newCfg := cfg
|
newCfg := cfg
|
||||||
newCfg.Destinations = slices.DeleteFunc(newCfg.Destinations, func(dest config.Destination) bool {
|
newCfg.Destinations = slices.DeleteFunc(newCfg.Destinations, func(dest config.Destination) bool {
|
||||||
@ -168,16 +183,16 @@ func Run(ctx context.Context, params RunParams) error {
|
|||||||
cfg = newCfg
|
cfg = newCfg
|
||||||
handleConfigUpdate(cfg, state, ui)
|
handleConfigUpdate(cfg, state, ui)
|
||||||
ui.DestinationRemoved()
|
ui.DestinationRemoved()
|
||||||
case terminal.CommandStartDestination:
|
case domain.CommandStartDestination:
|
||||||
if !state.Source.Live {
|
if !state.Source.Live {
|
||||||
ui.ShowSourceNotLiveModal()
|
ui.ShowSourceNotLiveModal()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
repl.StartDestination(c.URL)
|
repl.StartDestination(c.URL)
|
||||||
case terminal.CommandStopDestination:
|
case domain.CommandStopDestination:
|
||||||
repl.StopDestination(c.URL)
|
repl.StopDestination(c.URL)
|
||||||
case terminal.CommandQuit:
|
case domain.CommandQuit:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
case <-uiUpdateT.C:
|
case <-uiUpdateT.C:
|
||||||
@ -319,3 +334,12 @@ func doStartupCheck(ctx context.Context, containerClient *container.Client, show
|
|||||||
|
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// buildNetAddr builds a [mediaserver.OptionalNetAddr] from the config.
|
||||||
|
func buildNetAddr(src config.RTMPSource) mediaserver.OptionalNetAddr {
|
||||||
|
if !src.Enabled {
|
||||||
|
return mediaserver.OptionalNetAddr{Enabled: false}
|
||||||
|
}
|
||||||
|
|
||||||
|
return mediaserver.OptionalNetAddr{Enabled: true, NetAddr: domain.NetAddr(src.NetAddr)}
|
||||||
|
}
|
||||||
|
@ -85,6 +85,8 @@ func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- termina
|
|||||||
lines[y] += string(screenCells[n].Runes[0])
|
lines[y] += string(screenCells[n].Runes[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
require.GreaterOrEqual(t, len(lines), 5, "Screen contents should have at least 5 lines")
|
||||||
|
|
||||||
return lines
|
return lines
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,9 +5,13 @@ package app_test
|
|||||||
import (
|
import (
|
||||||
"cmp"
|
"cmp"
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"encoding/pem"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -29,13 +33,41 @@ import (
|
|||||||
"github.com/testcontainers/testcontainers-go/wait"
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const waitTime = time.Minute
|
||||||
|
|
||||||
func TestIntegration(t *testing.T) {
|
func TestIntegration(t *testing.T) {
|
||||||
t.Run("with default stream key", func(t *testing.T) {
|
t.Run("RTMP with default host, port and stream key", func(t *testing.T) {
|
||||||
testIntegration(t, "")
|
testIntegration(t, config.MediaServerSource{
|
||||||
|
RTMP: config.RTMPSource{Enabled: true},
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("with custom stream key", func(t *testing.T) {
|
t.Run("RTMPS with default host, port and stream key", func(t *testing.T) {
|
||||||
testIntegration(t, "s0meK3y")
|
testIntegration(t, config.MediaServerSource{
|
||||||
|
RTMPS: config.RTMPSource{Enabled: true},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("RTMP with custom host, port and stream key", func(t *testing.T) {
|
||||||
|
testIntegration(t, config.MediaServerSource{
|
||||||
|
StreamKey: "s0meK3y",
|
||||||
|
Host: "localhost",
|
||||||
|
RTMP: config.RTMPSource{
|
||||||
|
Enabled: true,
|
||||||
|
NetAddr: config.NetAddr{IP: "0.0.0.0", Port: 3000},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("RTMPS with custom host, port and stream key", func(t *testing.T) {
|
||||||
|
testIntegration(t, config.MediaServerSource{
|
||||||
|
StreamKey: "an0therK3y",
|
||||||
|
Host: "localhost",
|
||||||
|
RTMPS: config.RTMPSource{
|
||||||
|
Enabled: true,
|
||||||
|
NetAddr: config.NetAddr{IP: "0.0.0.0", Port: 443},
|
||||||
|
},
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -45,24 +77,39 @@ func TestIntegration(t *testing.T) {
|
|||||||
// https://stackoverflow.com/a/60740997/62871
|
// https://stackoverflow.com/a/60740997/62871
|
||||||
const hostIP = "172.17.0.1"
|
const hostIP = "172.17.0.1"
|
||||||
|
|
||||||
func testIntegration(t *testing.T, streamKey string) {
|
func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
|
||||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
wantStreamKey := cmp.Or(streamKey, "live")
|
var rtmpConfig config.RTMPSource
|
||||||
|
var proto string
|
||||||
|
var defaultPort int
|
||||||
|
if mediaServerConfig.RTMP.Enabled {
|
||||||
|
rtmpConfig = mediaServerConfig.RTMP
|
||||||
|
proto = "rtmp://"
|
||||||
|
defaultPort = 1935
|
||||||
|
} else {
|
||||||
|
rtmpConfig = mediaServerConfig.RTMPS
|
||||||
|
proto = "rtmps://"
|
||||||
|
defaultPort = 1936
|
||||||
|
}
|
||||||
|
|
||||||
|
wantHost := cmp.Or(mediaServerConfig.Host, "localhost")
|
||||||
|
wantRTMPPort := cmp.Or(rtmpConfig.Port, defaultPort)
|
||||||
|
wantStreamKey := cmp.Or(mediaServerConfig.StreamKey, "live")
|
||||||
|
wantRTMPURL := fmt.Sprintf("%s%s:%d/%s", proto, wantHost, wantRTMPPort, wantStreamKey)
|
||||||
|
|
||||||
destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
||||||
ContainerRequest: testcontainers.ContainerRequest{
|
ContainerRequest: testcontainers.ContainerRequest{
|
||||||
Image: "bluenviron/mediamtx:latest",
|
Image: "bluenviron/mediamtx:latest",
|
||||||
Env: map[string]string{"MTX_RTMPADDRESS": ":1936"},
|
ExposedPorts: []string{"1935/tcp"},
|
||||||
ExposedPorts: []string{"1936/tcp"},
|
WaitingFor: wait.ForListeningPort("1935/tcp"),
|
||||||
WaitingFor: wait.ForListeningPort("1936/tcp"),
|
|
||||||
},
|
},
|
||||||
Started: true,
|
Started: true,
|
||||||
})
|
})
|
||||||
testcontainers.CleanupContainer(t, destServer)
|
testcontainers.CleanupContainer(t, destServer)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
destServerPort, err := destServer.MappedPort(ctx, "1936/tcp")
|
destServerPort, err := destServer.MappedPort(ctx, "1935/tcp")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
logger := testhelpers.NewTestLogger(t).With("component", "integration")
|
logger := testhelpers.NewTestLogger(t).With("component", "integration")
|
||||||
@ -71,10 +118,10 @@ func testIntegration(t *testing.T, streamKey string) {
|
|||||||
|
|
||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
destURL1 := fmt.Sprintf("rtmp://%s:%d/%s/dest1", hostIP, destServerPort.Int(), wantStreamKey)
|
destURL1 := fmt.Sprintf("rtmp://%s:%d/live/dest1", hostIP, destServerPort.Int())
|
||||||
destURL2 := fmt.Sprintf("rtmp://%s:%d/%s/dest2", hostIP, destServerPort.Int(), wantStreamKey)
|
destURL2 := fmt.Sprintf("rtmp://%s:%d/live/dest2", hostIP, destServerPort.Int())
|
||||||
configService := setupConfigService(t, config.Config{
|
configService := setupConfigService(t, config.Config{
|
||||||
Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true, StreamKey: streamKey}},
|
Sources: config.Sources{MediaServer: mediaServerConfig},
|
||||||
// Load one destination from config, add the other in-app.
|
// Load one destination from config, add the other in-app.
|
||||||
Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}},
|
Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}},
|
||||||
})
|
})
|
||||||
@ -103,33 +150,30 @@ func testIntegration(t *testing.T, streamKey string) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected the mediaserver to start",
|
"expected the mediaserver to start",
|
||||||
)
|
)
|
||||||
printScreen(t, getContents, "After starting the mediaserver")
|
printScreen(t, getContents, "After starting the mediaserver")
|
||||||
|
|
||||||
// Start streaming a test video to the app:
|
// Start streaming a test video to the app:
|
||||||
testhelpers.StreamFLV(t, "rtmp://localhost:1935/"+wantStreamKey)
|
testhelpers.StreamFLV(t, wantRTMPURL)
|
||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[1], "URL rtmp://localhost:1935/"+wantStreamKey, "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||||
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy")
|
||||||
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to receive an ingress stream",
|
"expected to receive an ingress stream",
|
||||||
)
|
)
|
||||||
@ -149,22 +193,21 @@ func testIntegration(t *testing.T, streamKey string) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||||
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy")
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "off-air", "expected local server 1 to be off-air")
|
assert.Contains(c, contents[3], "off-air", "expected local server 0 to be off-air")
|
||||||
|
|
||||||
require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present")
|
require.Contains(c, contents[3], "Local server 2", "expected local server 2 to be present")
|
||||||
assert.Contains(t, contents[3], "off-air", "expected local server 2 to be off-air")
|
assert.Contains(c, contents[3], "off-air", "expected local server 2 to be off-air")
|
||||||
|
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to add the destinations",
|
"expected to add the destinations",
|
||||||
)
|
)
|
||||||
@ -177,23 +220,22 @@ func testIntegration(t *testing.T, streamKey string) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||||
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy")
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
|
assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending")
|
||||||
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
|
assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||||
|
|
||||||
require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present")
|
require.Contains(c, contents[3], "Local server 2", "expected local server 2 to be present")
|
||||||
assert.Contains(t, contents[3], "sending", "expected local server 2 to be sending")
|
assert.Contains(c, contents[3], "sending", "expected local server 2 to be sending")
|
||||||
assert.Contains(t, contents[3], "healthy", "expected local server 2 to be healthy")
|
assert.Contains(c, contents[3], "healthy", "expected local server 2 to be healthy")
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to start the destination streams",
|
"expected to start the destination streams",
|
||||||
)
|
)
|
||||||
@ -205,22 +247,21 @@ func testIntegration(t *testing.T, streamKey string) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264")
|
||||||
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy")
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
|
assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending")
|
||||||
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
|
assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||||
|
|
||||||
require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present")
|
require.NotContains(c, contents[3], "Local server 2", "expected local server 2 to not be present")
|
||||||
|
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to remove the second destination",
|
"expected to remove the second destination",
|
||||||
)
|
)
|
||||||
@ -231,16 +272,15 @@ func testIntegration(t *testing.T, streamKey string) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "exited", "expected local server 1 to have exited")
|
assert.Contains(c, contents[2], "exited", "expected local server 1 to have exited")
|
||||||
|
|
||||||
require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present")
|
require.NotContains(c, contents[3], "Local server 2", "expected local server 2 to not be present")
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to stop the first destination stream",
|
"expected to stop the first destination stream",
|
||||||
)
|
)
|
||||||
@ -256,6 +296,140 @@ func testIntegration(t *testing.T, streamKey string) {
|
|||||||
<-done
|
<-done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIntegrationCustomHost(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
logger := testhelpers.NewTestLogger(t).With("component", "integration")
|
||||||
|
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
configService := setupConfigService(t, config.Config{
|
||||||
|
Sources: config.Sources{
|
||||||
|
MediaServer: config.MediaServerSource{
|
||||||
|
Host: "rtmp.example.com",
|
||||||
|
RTMP: config.RTMPSource{Enabled: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
done <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
sendKey(t, screen, tcell.KeyF1, ' ')
|
||||||
|
|
||||||
|
require.EventuallyWithT(
|
||||||
|
t,
|
||||||
|
func(t *assert.CollectT) {
|
||||||
|
assert.True(t, contentsIncludes(getContents(), "rtmp://rtmp.example.com:1935/live"), "expected to see custom host name")
|
||||||
|
},
|
||||||
|
waitTime,
|
||||||
|
time.Second,
|
||||||
|
"expected to see custom host name",
|
||||||
|
)
|
||||||
|
printScreen(t, getContents, "Ater opening the app with a custom host name")
|
||||||
|
|
||||||
|
require.EventuallyWithT(
|
||||||
|
t,
|
||||||
|
func(c *assert.CollectT) {
|
||||||
|
conn, err := tls.Dial("tcp", "localhost:9997", &tls.Config{
|
||||||
|
InsecureSkipVerify: true,
|
||||||
|
})
|
||||||
|
require.NoError(c, err)
|
||||||
|
|
||||||
|
require.Nil(
|
||||||
|
c,
|
||||||
|
conn.
|
||||||
|
ConnectionState().
|
||||||
|
PeerCertificates[0].
|
||||||
|
VerifyHostname("rtmp.example.com"),
|
||||||
|
"expected to verify custom host name",
|
||||||
|
)
|
||||||
|
},
|
||||||
|
waitTime,
|
||||||
|
time.Second,
|
||||||
|
"expected to connect to API using self-signed TLS cert with custom host name",
|
||||||
|
)
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIntegrationCustomTLSCerts(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
logger := testhelpers.NewTestLogger(t).With("component", "integration")
|
||||||
|
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
configService := setupConfigService(t, config.Config{
|
||||||
|
Sources: config.Sources{
|
||||||
|
MediaServer: config.MediaServerSource{
|
||||||
|
TLS: &config.TLS{
|
||||||
|
CertPath: "testdata/server.crt",
|
||||||
|
KeyPath: "testdata/server.key",
|
||||||
|
},
|
||||||
|
RTMPS: config.RTMPSource{Enabled: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
done <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||||
|
}()
|
||||||
|
|
||||||
|
require.EventuallyWithT(
|
||||||
|
t,
|
||||||
|
func(c *assert.CollectT) {
|
||||||
|
certPEM, err := os.ReadFile("testdata/server.crt")
|
||||||
|
require.NoError(c, err)
|
||||||
|
|
||||||
|
block, _ := pem.Decode(certPEM)
|
||||||
|
require.NotNil(c, block, "failed to decode PEM block containing certificate")
|
||||||
|
require.True(c, block.Type == "CERTIFICATE", "expected PEM block to be a certificate")
|
||||||
|
|
||||||
|
rootCAs := x509.NewCertPool()
|
||||||
|
require.True(c, rootCAs.AppendCertsFromPEM(certPEM), "failed to append cert to root CA pool")
|
||||||
|
|
||||||
|
conn, err := tls.Dial("tcp", "localhost:1936", &tls.Config{
|
||||||
|
RootCAs: rootCAs,
|
||||||
|
ServerName: "localhost",
|
||||||
|
InsecureSkipVerify: false,
|
||||||
|
})
|
||||||
|
require.NoError(c, err)
|
||||||
|
|
||||||
|
peerCert := conn.ConnectionState().PeerCertificates[0]
|
||||||
|
wantCert, err := x509.ParseCertificate(block.Bytes)
|
||||||
|
require.NoError(c, err)
|
||||||
|
require.True(c, peerCert.Equal(wantCert), "expected peer certificate to match the expected certificate")
|
||||||
|
},
|
||||||
|
waitTime,
|
||||||
|
time.Second,
|
||||||
|
)
|
||||||
|
|
||||||
|
printScreen(t, getContents, "After starting the app with custom TLS certs")
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
func TestIntegrationRestartDestination(t *testing.T) {
|
func TestIntegrationRestartDestination(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -285,7 +459,7 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
|||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
configService := setupConfigService(t, config.Config{
|
configService := setupConfigService(t, config.Config{
|
||||||
Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}},
|
Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}},
|
||||||
Destinations: []config.Destination{{
|
Destinations: []config.Destination{{
|
||||||
Name: "Local server 1",
|
Name: "Local server 1",
|
||||||
URL: fmt.Sprintf("rtmp://%s:%d/live", hostIP, destServerRTMPPort.Int()),
|
URL: fmt.Sprintf("rtmp://%s:%d/live", hostIP, destServerRTMPPort.Int()),
|
||||||
@ -303,13 +477,12 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected the mediaserver to start",
|
"expected the mediaserver to start",
|
||||||
)
|
)
|
||||||
@ -320,13 +493,12 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 3, "expected at least 3 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to receive an ingress stream",
|
"expected to receive an ingress stream",
|
||||||
)
|
)
|
||||||
@ -337,17 +509,16 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
|
assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending")
|
||||||
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
|
assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to start the destination stream",
|
"expected to start the destination stream",
|
||||||
)
|
)
|
||||||
@ -360,17 +531,16 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 3, "expected at least 3 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "off-air", "expected local server 1 to be off-air")
|
assert.Contains(c, contents[2], "off-air", "expected local server 1 to be off-air")
|
||||||
assert.Contains(t, contents[2], "restarting", "expected local server 1 to be restarting")
|
assert.Contains(c, contents[2], "restarting", "expected local server 1 to be restarting")
|
||||||
},
|
},
|
||||||
20*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to begin restarting",
|
"expected to begin restarting",
|
||||||
)
|
)
|
||||||
@ -378,17 +548,16 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 4 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
|
assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending")
|
||||||
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
|
assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy")
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to restart the destination stream",
|
"expected to restart the destination stream",
|
||||||
)
|
)
|
||||||
@ -399,16 +568,15 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 4, "expected at least 4 lines of output")
|
|
||||||
|
|
||||||
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present")
|
||||||
assert.Contains(t, contents[2], "exited", "expected local server 1 to have exited")
|
assert.Contains(c, contents[2], "exited", "expected local server 1 to have exited")
|
||||||
|
|
||||||
require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present")
|
require.NotContains(c, contents[3], "Local server 2", "expected local server 2 to not be present")
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to stop the destination stream",
|
"expected to stop the destination stream",
|
||||||
)
|
)
|
||||||
@ -431,7 +599,7 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
|||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
configService := setupConfigService(t, config.Config{
|
configService := setupConfigService(t, config.Config{
|
||||||
Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}},
|
Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}},
|
||||||
Destinations: []config.Destination{{Name: "Example server", URL: "rtmp://rtmp.example.com/live"}},
|
Destinations: []config.Destination{{Name: "Example server", URL: "rtmp://rtmp.example.com/live"}},
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -446,13 +614,12 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected the mediaserver to start",
|
"expected the mediaserver to start",
|
||||||
)
|
)
|
||||||
@ -463,13 +630,12 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 3, "expected at least 3 lines of output")
|
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving")
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to receive an ingress stream",
|
"expected to receive an ingress stream",
|
||||||
)
|
)
|
||||||
@ -480,12 +646,12 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
assert.True(t, contentsIncludes(contents, "Streaming to Example server failed:"), "expected to see destination error")
|
assert.True(c, contentsIncludes(contents, "Streaming to Example server failed:"), "expected to see destination error")
|
||||||
assert.True(t, contentsIncludes(contents, "Error opening output files: I/O error"), "expected to see destination error")
|
assert.True(c, contentsIncludes(contents, "Error opening output files: I/O error"), "expected to see destination error")
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to see the destination start error modal",
|
"expected to see the destination start error modal",
|
||||||
)
|
)
|
||||||
@ -507,7 +673,7 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
|||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
configService := setupConfigService(t, config.Config{
|
configService := setupConfigService(t, config.Config{
|
||||||
Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true, StreamKey: "live"}},
|
Sources: config.Sources{MediaServer: config.MediaServerSource{StreamKey: "live", RTMP: config.RTMPSource{Enabled: true}}},
|
||||||
})
|
})
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
@ -521,14 +687,14 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
require.True(c, len(contents) > 2, "expected at least 3 lines of output")
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||||
assert.True(t, contentsIncludes(contents, "No destinations added yet. Press [a] to add a new destination."), "expected to see no destinations message")
|
assert.True(c, contentsIncludes(contents, "No destinations added yet. Press [a] to add a new destination."), "expected to see no destinations message")
|
||||||
},
|
},
|
||||||
2*time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected the mediaserver to start",
|
"expected the mediaserver to start",
|
||||||
)
|
)
|
||||||
@ -542,13 +708,13 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
|
|
||||||
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
assert.True(c, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
||||||
assert.True(t, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error")
|
assert.True(c, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error")
|
||||||
},
|
},
|
||||||
10*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected a validation error for an empty URL",
|
"expected a validation error for an empty URL",
|
||||||
)
|
)
|
||||||
@ -562,13 +728,13 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
|
|
||||||
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
assert.True(c, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
||||||
assert.True(t, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error")
|
assert.True(c, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error")
|
||||||
},
|
},
|
||||||
10*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected a validation error for an invalid URL",
|
"expected a validation error for an invalid URL",
|
||||||
)
|
)
|
||||||
@ -583,15 +749,14 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
||||||
|
|
||||||
require.Contains(t, contents[2], "My stream", "expected new destination to be present")
|
require.Contains(c, contents[2], "My stream", "expected new destination to be present")
|
||||||
assert.Contains(t, contents[2], "off-air", "expected new destination to be off-air")
|
assert.Contains(c, contents[2], "off-air", "expected new destination to be off-air")
|
||||||
assert.False(t, contentsIncludes(contents, "No destinations added yet. Press [a] to add a new destination."), "expected to not see no destinations message")
|
assert.False(c, contentsIncludes(contents, "No destinations added yet"), "expected to not see no destinations message")
|
||||||
},
|
},
|
||||||
10*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to add the destination",
|
"expected to add the destination",
|
||||||
)
|
)
|
||||||
@ -609,13 +774,13 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
|
|
||||||
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
assert.True(c, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
||||||
assert.True(t, contentsIncludes(contents, "validate: duplicate destination URL: rtmp://"), "expected to see config update error")
|
assert.True(c, contentsIncludes(contents, "validate: duplicate destination URL: rtmp://"), "expected to see config update error")
|
||||||
},
|
},
|
||||||
10*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected a validation error for a duplicate URL",
|
"expected a validation error for a duplicate URL",
|
||||||
)
|
)
|
||||||
@ -650,7 +815,7 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
|||||||
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}}})
|
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
@ -664,10 +829,10 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
assert.True(t, contentsIncludes(getContents(), "Another instance of Octoplex may already be running."), "expected to see startup check modal")
|
assert.True(c, contentsIncludes(getContents(), "Another instance of Octoplex may already be running."), "expected to see startup check modal")
|
||||||
},
|
},
|
||||||
30*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to see startup check modal",
|
"expected to see startup check modal",
|
||||||
)
|
)
|
||||||
@ -677,13 +842,13 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
_, err := staleContainer.State(context.Background())
|
_, err := staleContainer.State(context.Background())
|
||||||
// IsRunning() does not work, probably because we're undercutting the
|
// IsRunning() does not work, probably because we're undercutting the
|
||||||
// testcontainers API.
|
// testcontainers API.
|
||||||
require.True(t, errdefs.IsNotFound(err), "expected to not find the container")
|
require.True(c, errdefs.IsNotFound(err), "expected to not find the container")
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
2*time.Second,
|
2*time.Second,
|
||||||
"expected to quit the other containers",
|
"expected to quit the other containers",
|
||||||
)
|
)
|
||||||
@ -691,13 +856,13 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
contents := getContents()
|
contents := getContents()
|
||||||
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
require.True(c, len(contents) > 2, "expected at least 3 lines of output")
|
||||||
|
|
||||||
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
|
assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting")
|
||||||
},
|
},
|
||||||
10*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected the mediaserver to start",
|
"expected the mediaserver to start",
|
||||||
)
|
)
|
||||||
@ -719,7 +884,7 @@ func TestIntegrationMediaServerError(t *testing.T) {
|
|||||||
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}}})
|
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
@ -733,11 +898,11 @@ func TestIntegrationMediaServerError(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
assert.True(t, contentsIncludes(getContents(), "Mediaserver error: Server process exited unexpectedly."), "expected to see title")
|
assert.True(c, contentsIncludes(getContents(), "Mediaserver error: Server process exited unexpectedly."), "expected to see title")
|
||||||
assert.True(t, contentsIncludes(getContents(), "address already in use"), "expected to see message")
|
assert.True(c, contentsIncludes(getContents(), "address already in use"), "expected to see message")
|
||||||
},
|
},
|
||||||
time.Minute,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to see media server error modal",
|
"expected to see media server error modal",
|
||||||
)
|
)
|
||||||
@ -758,7 +923,7 @@ func TestIntegrationDockerClientError(t *testing.T) {
|
|||||||
var dockerClient mocks.DockerClient
|
var dockerClient mocks.DockerClient
|
||||||
dockerClient.EXPECT().NetworkCreate(mock.Anything, mock.Anything, mock.Anything).Return(network.CreateResponse{}, errors.New("boom"))
|
dockerClient.EXPECT().NetworkCreate(mock.Anything, mock.Anything, mock.Anything).Return(network.CreateResponse{}, errors.New("boom"))
|
||||||
|
|
||||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}}})
|
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
@ -776,11 +941,11 @@ func TestIntegrationDockerClientError(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
assert.True(t, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message")
|
assert.True(c, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message")
|
||||||
assert.True(t, contentsIncludes(getContents(), "create container client: network create: boom"), "expected to see message")
|
assert.True(c, contentsIncludes(getContents(), "create container client: network create: boom"), "expected to see message")
|
||||||
},
|
},
|
||||||
5*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to see fatal error modal",
|
"expected to see fatal error modal",
|
||||||
)
|
)
|
||||||
@ -791,6 +956,7 @@ func TestIntegrationDockerClientError(t *testing.T) {
|
|||||||
|
|
||||||
<-done
|
<-done
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIntegrationDockerConnectionError(t *testing.T) {
|
func TestIntegrationDockerConnectionError(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -799,7 +965,7 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
|
|||||||
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.WithHost("http://docker.example.com"))
|
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.WithHost("http://docker.example.com"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}}})
|
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
@ -815,11 +981,11 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
|
|||||||
|
|
||||||
require.EventuallyWithT(
|
require.EventuallyWithT(
|
||||||
t,
|
t,
|
||||||
func(t *assert.CollectT) {
|
func(c *assert.CollectT) {
|
||||||
assert.True(t, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message")
|
assert.True(c, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message")
|
||||||
assert.True(t, contentsIncludes(getContents(), "Could not connect to Docker. Is Docker installed"), "expected to see message")
|
assert.True(c, contentsIncludes(getContents(), "Could not connect to Docker. Is Docker installed"), "expected to see message")
|
||||||
},
|
},
|
||||||
5*time.Second,
|
waitTime,
|
||||||
time.Second,
|
time.Second,
|
||||||
"expected to see fatal error modal",
|
"expected to see fatal error modal",
|
||||||
)
|
)
|
||||||
@ -830,3 +996,103 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
|
|||||||
|
|
||||||
<-done
|
<-done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIntegrationCopyURLs(t *testing.T) {
|
||||||
|
type binding struct {
|
||||||
|
key tcell.Key
|
||||||
|
content string
|
||||||
|
url string
|
||||||
|
}
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
mediaServerConfig config.MediaServerSource
|
||||||
|
wantBindings []binding
|
||||||
|
wantNot []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "RTMP only",
|
||||||
|
mediaServerConfig: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}},
|
||||||
|
wantBindings: []binding{
|
||||||
|
{
|
||||||
|
key: tcell.KeyF1,
|
||||||
|
content: "F1 Copy source RTMP URL",
|
||||||
|
url: "rtmp://localhost:1935/live",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantNot: []string{"F2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "RTMPS only",
|
||||||
|
mediaServerConfig: config.MediaServerSource{RTMPS: config.RTMPSource{Enabled: true}},
|
||||||
|
wantBindings: []binding{
|
||||||
|
{
|
||||||
|
key: tcell.KeyF1,
|
||||||
|
content: "F1 Copy source RTMPS URL",
|
||||||
|
url: "rtmps://localhost:1936/live",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantNot: []string{"F2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "RTMP and RTMPS",
|
||||||
|
mediaServerConfig: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}, RTMPS: config.RTMPSource{Enabled: true}},
|
||||||
|
wantBindings: []binding{
|
||||||
|
{
|
||||||
|
key: tcell.KeyF1,
|
||||||
|
content: "F1 Copy source RTMP URL",
|
||||||
|
url: "rtmp://localhost:1935/live",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
key: tcell.KeyF2,
|
||||||
|
content: "F2 Copy source RTMPS URL",
|
||||||
|
url: "rtmps://localhost:1936/live",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
logger := testhelpers.NewTestLogger(t).With("component", "integration")
|
||||||
|
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: tc.mediaServerConfig}})
|
||||||
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
done <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
printScreen(t, getContents, "Ater loading the app")
|
||||||
|
|
||||||
|
for _, want := range tc.wantBindings {
|
||||||
|
assert.True(t, contentsIncludes(getContents(), want.content), "expected to see %q", want)
|
||||||
|
|
||||||
|
sendKey(t, screen, want.key, ' ')
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
assert.True(t, contentsIncludes(getContents(), want.url), "expected to see copied message")
|
||||||
|
|
||||||
|
sendKey(t, screen, tcell.KeyEscape, ' ')
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, wantNot := range tc.wantNot {
|
||||||
|
assert.False(t, contentsIncludes(getContents(), wantNot), "expected to not see %q", wantNot)
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
<-done
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
17
internal/app/testdata/openssl.cnf
vendored
Normal file
17
internal/app/testdata/openssl.cnf
vendored
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
# openssl req -x509 -nodes -days 3650 -newkey rsa:2048 -keyout server.key -out server.crt -config openssl.cnf
|
||||||
|
|
||||||
|
[req]
|
||||||
|
default_bits = 2048
|
||||||
|
prompt = no
|
||||||
|
default_md = sha256
|
||||||
|
distinguished_name = dn
|
||||||
|
x509_extensions = v3_req
|
||||||
|
|
||||||
|
[dn]
|
||||||
|
CN = localhost
|
||||||
|
|
||||||
|
[v3_req]
|
||||||
|
subjectAltName = @alt_names
|
||||||
|
|
||||||
|
[alt_names]
|
||||||
|
DNS.1 = localhost
|
18
internal/app/testdata/server.crt
vendored
Normal file
18
internal/app/testdata/server.crt
vendored
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
-----BEGIN CERTIFICATE-----
|
||||||
|
MIIC7TCCAdWgAwIBAgIUTeqv46R19q+BS2e4DBkbIHuWyIIwDQYJKoZIhvcNAQEL
|
||||||
|
BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTI1MDQyMDA4NTMwN1oXDTM1MDQx
|
||||||
|
ODA4NTMwN1owFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEF
|
||||||
|
AAOCAQ8AMIIBCgKCAQEA0v/KndfKfG8XItStHeMQ/3z1r8vhkH9KGpfSwDMp8MdH
|
||||||
|
Mox6vcAsIIr1RFKmalQQg+T+TK9v3XM6F4sJ+WPyb5/31xLUqG6zivitrMy1AZ8w
|
||||||
|
XLgAz/CTufXL3OBntDwg29QXWt9lOUJyjRa66AQqreTlItuLG65bswfPA4g35f+U
|
||||||
|
hyr49paukqnVHRr44GtyiNxlfYCEdQWdOR0EQmZ7y6WNQQhnR8odQyftR2lykf17
|
||||||
|
MSJ8us4JAgZ2fr1QR+DfX5bCSS/WJ2aO7xxeES40NizBx08qYFami1zXrGMMo35I
|
||||||
|
SfedCohcok8ZZ1oWL+MfSJ2OLVclDnznDPTx39pZPQIDAQABozcwNTAUBgNVHREE
|
||||||
|
DTALgglsb2NhbGhvc3QwHQYDVR0OBBYEFCgZah+m2NXkI9biS2vnhNUrd3FiMA0G
|
||||||
|
CSqGSIb3DQEBCwUAA4IBAQAPbofZIKCm3DnudFnK+LRkdlpMNOyH2zn3g8h8vrfL
|
||||||
|
Tfi0oBgHb7EYxcHYDanZbcIKracWCfQVze2FRLgNFBWiyhDO4IXe/LpwSnbyLWCh
|
||||||
|
psbGuyVmEz9CuiyVdIi+CWQs5dBBRUCFg6NE2/r6Diw9LD0fVCVUwkvqopetfp1B
|
||||||
|
tvA74O0RduLWs+iXNs5XW4sODVkrOmhBbRrP9GRCVqiqVWJka6CzrNdBm0Y9zZMQ
|
||||||
|
GD/6fEgDaW8YlShoO+e4FwmD2IgIx+m4xamr/cQkWpbOHMxAwv7vP0stfkpyUacW
|
||||||
|
dh9eJmsDAmgGgdtMJvbIfyR9ilG8D6zwOmSlkF6fDJ3E
|
||||||
|
-----END CERTIFICATE-----
|
28
internal/app/testdata/server.key
vendored
Normal file
28
internal/app/testdata/server.key
vendored
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
-----BEGIN PRIVATE KEY-----
|
||||||
|
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDS/8qd18p8bxci
|
||||||
|
1K0d4xD/fPWvy+GQf0oal9LAMynwx0cyjHq9wCwgivVEUqZqVBCD5P5Mr2/dczoX
|
||||||
|
iwn5Y/Jvn/fXEtSobrOK+K2szLUBnzBcuADP8JO59cvc4Ge0PCDb1Bda32U5QnKN
|
||||||
|
FrroBCqt5OUi24sbrluzB88DiDfl/5SHKvj2lq6SqdUdGvjga3KI3GV9gIR1BZ05
|
||||||
|
HQRCZnvLpY1BCGdHyh1DJ+1HaXKR/XsxIny6zgkCBnZ+vVBH4N9flsJJL9YnZo7v
|
||||||
|
HF4RLjQ2LMHHTypgVqaLXNesYwyjfkhJ950KiFyiTxlnWhYv4x9InY4tVyUOfOcM
|
||||||
|
9PHf2lk9AgMBAAECggEAC3E3qaukHW9gz9C8upwvtcsu/6OMzes5N4v4L9gWdCo6
|
||||||
|
YDFiDpw3SGSAvH3G7Ik2hBCNAdeZt2aiRdiSZ+XVpdwE8rLguWmXbvfhYzeOsVHS
|
||||||
|
q5SG5r/jIviDX60DsrB4D7PGuHTY5mwGDkSnSiG/tsJs8qD5QD0KWAEaZtSiQ2Sp
|
||||||
|
kcRbdq13/2tjHyx7nBxEYUFC4EJQjK3cNNV4G7nG2xcfT46uPvFV0+1CQtMpFYhi
|
||||||
|
IsGaSBhW9gOAheycYxCi+LRdUh1IAnLUyYUenu0o8PoXsHp6KD8eS5RXtfA6THd/
|
||||||
|
Jr614gdAB2Sffw+bFf6FIBNWa5Jwsg9UtbGtjNdo+QKBgQDrOJ2nj7El6MIqeDHs
|
||||||
|
1cCeGDKmjB1CYWALLHrwwiwmrvEoeBMiJuMN4epZdQw9hwExa7fNpERI7Ay8s5HD
|
||||||
|
cdppxgcW7CWChNncbVZ39P+YI9URWC2Q2Y8FBhc9FA0sKpDak0rf5UE63SGjU8/I
|
||||||
|
FGgwjd1Ln5wws00OsYXBZw1lzwKBgQDlo2kRy6xvrUNAbeggT9OQeg2SdkWqvS3v
|
||||||
|
NUhBzZkVhJNf1oApNRoAvRMQt+Xt+Euw1pQ+TvdOZQhhqxs/pD/wGdM7rhq9r0+G
|
||||||
|
itsQ5LvNCxCePbSkbFMLgC8JgNuM3aRqhtsU+Illk9xvCj2nKsd+UUN3NxYgjCqa
|
||||||
|
evTKSzUfMwKBgFapy1w7EteWxEMFec96ibc1zyORqA4W9l3ni3w87itqdSul4dbJ
|
||||||
|
YQpyW/eNqm7Y2NWujE/V39rGLYMw3dmWjxQ9g8ssQj2uWN5f4mXb/He/a/cx98fQ
|
||||||
|
gGMndVRpmNjW7fu6HPIU802Ov5//dySOcDzDZ+8+5TsENLXfLhqtrz/9AoGBALc+
|
||||||
|
/BQoTFTdlSHv0mEecjwDOZtbZ+KEjggpo5xm/TbPkW7T03eOmU5nkrQvm3qXPYdC
|
||||||
|
5A8Ioo5bTyHpEZhqcF8frJEeMNaW88XwPjmv3TEVGFC9+s2OZ4Jw6pgRzKEPKSmc
|
||||||
|
rWyBm9qD8E5nhKVGHOVu4YBbY/va/hBB998Jvr1DAoGBAK5nnswLyQZi0lgpkl1P
|
||||||
|
ITkmvnQlZBfuqvoD7wcQ3nx/K/mdacsxepRne+U/4+iNzRtd3gU0iccCWUTJl4aB
|
||||||
|
cFRW1eXWuff+4vmM4JToDevGPXrS0CHE20mATJRZPH+YjZFl0pFSc4/tnjxBnx4y
|
||||||
|
vgM382WU9N9jIHCCnM6DYsbK
|
||||||
|
-----END PRIVATE KEY-----
|
@ -22,15 +22,37 @@ func (l LogFile) GetPath() string {
|
|||||||
return cmp.Or(l.Path, l.defaultPath)
|
return cmp.Or(l.Path, l.defaultPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NetAddr holds an IP and/or port.
|
||||||
|
type NetAddr struct {
|
||||||
|
IP string `yaml:"ip,omitempty"`
|
||||||
|
Port int `yaml:"port,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// RTMPSource holds the configuration for the RTMP source.
|
// RTMPSource holds the configuration for the RTMP source.
|
||||||
type RTMPSource struct {
|
type RTMPSource struct {
|
||||||
Enabled bool `yaml:"enabled"`
|
Enabled bool `yaml:"enabled"`
|
||||||
|
|
||||||
|
NetAddr `yaml:",inline"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TLS holds the TLS configuration.
|
||||||
|
type TLS struct {
|
||||||
|
CertPath string `yaml:"cert,omitempty"`
|
||||||
|
KeyPath string `yaml:"key,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// MediaServerSource holds the configuration for the media server source.
|
||||||
|
type MediaServerSource struct {
|
||||||
StreamKey string `yaml:"streamKey,omitempty"`
|
StreamKey string `yaml:"streamKey,omitempty"`
|
||||||
|
Host string `yaml:"host,omitempty"`
|
||||||
|
TLS *TLS `yaml:"tls,omitempty"`
|
||||||
|
RTMP RTMPSource `yaml:"rtmp"`
|
||||||
|
RTMPS RTMPSource `yaml:"rtmps"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sources holds the configuration for the sources.
|
// Sources holds the configuration for the sources.
|
||||||
type Sources struct {
|
type Sources struct {
|
||||||
RTMP RTMPSource `yaml:"rtmp"`
|
MediaServer MediaServerSource `yaml:"mediaServer"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config holds the configuration for the application.
|
// Config holds the configuration for the application.
|
||||||
|
@ -180,10 +180,13 @@ func (s *Service) writeConfig(cfgBytes []byte) error {
|
|||||||
// populateConfigOnBuild is called to set default values for a new, empty
|
// populateConfigOnBuild is called to set default values for a new, empty
|
||||||
// configuration.
|
// configuration.
|
||||||
//
|
//
|
||||||
// This function may set exported fields to arbitrary values.
|
// This function may set serialized fields to arbitrary values.
|
||||||
func (s *Service) populateConfigOnBuild(cfg *Config) {
|
func (s *Service) populateConfigOnBuild(cfg *Config) {
|
||||||
cfg.Sources.RTMP.Enabled = true
|
cfg.Sources.MediaServer.StreamKey = "live"
|
||||||
cfg.Sources.RTMP.StreamKey = "live"
|
cfg.Sources.MediaServer.RTMP = RTMPSource{
|
||||||
|
Enabled: true,
|
||||||
|
NetAddr: NetAddr{IP: "127.0.0.1", Port: 1935},
|
||||||
|
}
|
||||||
|
|
||||||
s.populateConfigOnRead(cfg)
|
s.populateConfigOnRead(cfg)
|
||||||
}
|
}
|
||||||
@ -191,7 +194,7 @@ func (s *Service) populateConfigOnBuild(cfg *Config) {
|
|||||||
// populateConfigOnRead is called to set default values for a configuration
|
// populateConfigOnRead is called to set default values for a configuration
|
||||||
// read from an existing file.
|
// read from an existing file.
|
||||||
//
|
//
|
||||||
// This function should not update any exported values, which would be a
|
// This function should not update any serialized values, which would be a
|
||||||
// confusing experience for the user.
|
// confusing experience for the user.
|
||||||
func (s *Service) populateConfigOnRead(cfg *Config) {
|
func (s *Service) populateConfigOnRead(cfg *Config) {
|
||||||
cfg.LogFile.defaultPath = filepath.Join(s.appStateDir, "octoplex.log")
|
cfg.LogFile.defaultPath = filepath.Join(s.appStateDir, "octoplex.log")
|
||||||
|
@ -19,6 +19,9 @@ import (
|
|||||||
//go:embed testdata/complete.yml
|
//go:embed testdata/complete.yml
|
||||||
var configComplete []byte
|
var configComplete []byte
|
||||||
|
|
||||||
|
//go:embed testdata/rtmps-only.yml
|
||||||
|
var configRTMPSOnly []byte
|
||||||
|
|
||||||
//go:embed testdata/logfile.yml
|
//go:embed testdata/logfile.yml
|
||||||
var configLogfile []byte
|
var configLogfile []byte
|
||||||
|
|
||||||
@ -44,7 +47,9 @@ func TestConfigServiceCurrent(t *testing.T) {
|
|||||||
t.Cleanup(func() { require.NoError(t, os.RemoveAll(systemConfigDir)) })
|
t.Cleanup(func() { require.NoError(t, os.RemoveAll(systemConfigDir)) })
|
||||||
|
|
||||||
// Ensure defaults are set:
|
// Ensure defaults are set:
|
||||||
assert.True(t, service.Current().Sources.RTMP.Enabled)
|
assert.NotNil(t, service.Current().Sources.MediaServer.RTMP)
|
||||||
|
assert.Equal(t, "127.0.0.1", service.Current().Sources.MediaServer.RTMP.IP)
|
||||||
|
assert.Equal(t, 1935, service.Current().Sources.MediaServer.RTMP.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConfigServiceCreateConfig(t *testing.T) {
|
func TestConfigServiceCreateConfig(t *testing.T) {
|
||||||
@ -67,7 +72,9 @@ func TestConfigServiceCreateConfig(t *testing.T) {
|
|||||||
|
|
||||||
var readCfg config.Config
|
var readCfg config.Config
|
||||||
require.NoError(t, yaml.Unmarshal(cfgBytes, &readCfg))
|
require.NoError(t, yaml.Unmarshal(cfgBytes, &readCfg))
|
||||||
assert.True(t, readCfg.Sources.RTMP.Enabled, "default values not set")
|
assert.NotNil(t, readCfg.Sources.MediaServer.RTMP)
|
||||||
|
assert.Equal(t, "127.0.0.1", readCfg.Sources.MediaServer.RTMP.IP)
|
||||||
|
assert.Equal(t, 1935, readCfg.Sources.MediaServer.RTMP.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConfigServiceReadConfig(t *testing.T) {
|
func TestConfigServiceReadConfig(t *testing.T) {
|
||||||
@ -90,9 +97,27 @@ func TestConfigServiceReadConfig(t *testing.T) {
|
|||||||
Path: "test.log",
|
Path: "test.log",
|
||||||
},
|
},
|
||||||
Sources: config.Sources{
|
Sources: config.Sources{
|
||||||
|
MediaServer: config.MediaServerSource{
|
||||||
|
StreamKey: "s3cr3t",
|
||||||
|
Host: "rtmp.example.com",
|
||||||
|
TLS: &config.TLS{
|
||||||
|
CertPath: "/etc/cert.pem",
|
||||||
|
KeyPath: "/etc/key.pem",
|
||||||
|
},
|
||||||
RTMP: config.RTMPSource{
|
RTMP: config.RTMPSource{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
StreamKey: "s3cr3t",
|
NetAddr: config.NetAddr{
|
||||||
|
IP: "0.0.0.0",
|
||||||
|
Port: 19350,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
RTMPS: config.RTMPSource{
|
||||||
|
Enabled: true,
|
||||||
|
NetAddr: config.NetAddr{
|
||||||
|
IP: "0.0.0.0",
|
||||||
|
Port: 19443,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Destinations: []config.Destination{
|
Destinations: []config.Destination{
|
||||||
@ -108,6 +133,33 @@ func TestConfigServiceReadConfig(t *testing.T) {
|
|||||||
)
|
)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "RTMPS only",
|
||||||
|
configBytes: configRTMPSOnly,
|
||||||
|
want: func(t *testing.T, cfg config.Config) {
|
||||||
|
require.Empty(
|
||||||
|
t,
|
||||||
|
gocmp.Diff(
|
||||||
|
config.Config{
|
||||||
|
LogFile: config.LogFile{Enabled: true},
|
||||||
|
Sources: config.Sources{
|
||||||
|
MediaServer: config.MediaServerSource{
|
||||||
|
RTMPS: config.RTMPSource{
|
||||||
|
Enabled: true,
|
||||||
|
NetAddr: config.NetAddr{
|
||||||
|
IP: "0.0.0.0",
|
||||||
|
Port: 1935,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
cfg,
|
||||||
|
cmpopts.IgnoreUnexported(config.LogFile{}),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "logging enabled, logfile",
|
name: "logging enabled, logfile",
|
||||||
configBytes: configLogfile,
|
configBytes: configLogfile,
|
||||||
|
13
internal/config/testdata/complete.yml
vendored
13
internal/config/testdata/complete.yml
vendored
@ -3,9 +3,20 @@ logfile:
|
|||||||
enabled: true
|
enabled: true
|
||||||
path: test.log
|
path: test.log
|
||||||
sources:
|
sources:
|
||||||
|
mediaServer:
|
||||||
|
streamKey: s3cr3t
|
||||||
|
host: rtmp.example.com
|
||||||
|
tls:
|
||||||
|
cert: /etc/cert.pem
|
||||||
|
key: /etc/key.pem
|
||||||
rtmp:
|
rtmp:
|
||||||
enabled: true
|
enabled: true
|
||||||
streamKey: s3cr3t
|
ip: 0.0.0.0
|
||||||
|
port: 19350
|
||||||
|
rtmps:
|
||||||
|
enabled: true
|
||||||
|
ip: 0.0.0.0
|
||||||
|
port: 19443
|
||||||
destinations:
|
destinations:
|
||||||
- name: my stream
|
- name: my stream
|
||||||
url: rtmp://rtmp.example.com:1935/live
|
url: rtmp://rtmp.example.com:1935/live
|
||||||
|
9
internal/config/testdata/rtmps-only.yml
vendored
Normal file
9
internal/config/testdata/rtmps-only.yml
vendored
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
---
|
||||||
|
logfile:
|
||||||
|
enabled: true
|
||||||
|
sources:
|
||||||
|
mediaServer:
|
||||||
|
rtmps:
|
||||||
|
enabled: true
|
||||||
|
ip: 0.0.0.0
|
||||||
|
port: 1935
|
@ -1,4 +1,4 @@
|
|||||||
package terminal
|
package domain
|
||||||
|
|
||||||
// CommandAddDestination adds a destination.
|
// CommandAddDestination adds a destination.
|
||||||
type CommandAddDestination struct {
|
type CommandAddDestination struct {
|
@ -34,9 +34,7 @@ type Source struct {
|
|||||||
Container Container
|
Container Container
|
||||||
Live bool
|
Live bool
|
||||||
LiveChangedAt time.Time
|
LiveChangedAt time.Time
|
||||||
Listeners int
|
|
||||||
Tracks []string
|
Tracks []string
|
||||||
RTMPURL string
|
|
||||||
ExitReason string
|
ExitReason string
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,6 +55,27 @@ type Destination struct {
|
|||||||
URL string
|
URL string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NetAddr holds a network address.
|
||||||
|
type NetAddr struct {
|
||||||
|
IP string
|
||||||
|
Port int
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsZero returns true if the NetAddr is zero value.
|
||||||
|
func (n NetAddr) IsZero() bool {
|
||||||
|
return n.IP == "" && n.Port == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyPair holds a TLS key pair.
|
||||||
|
type KeyPair struct {
|
||||||
|
Cert, Key []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsZero returns true if the KeyPair is zero value.
|
||||||
|
func (k KeyPair) IsZero() bool {
|
||||||
|
return k.Cert == nil && k.Key == nil
|
||||||
|
}
|
||||||
|
|
||||||
// Container status strings.
|
// Container status strings.
|
||||||
//
|
//
|
||||||
// TODO: refactor to strictly reflect Docker status strings.
|
// TODO: refactor to strictly reflect Docker status strings.
|
||||||
|
@ -31,3 +31,21 @@ func TestAppStateClone(t *testing.T) {
|
|||||||
s.Destinations[0].Name = "Twitch"
|
s.Destinations[0].Name = "Twitch"
|
||||||
assert.Equal(t, "YouTube", s2.Destinations[0].Name)
|
assert.Equal(t, "YouTube", s2.Destinations[0].Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNetAddr(t *testing.T) {
|
||||||
|
var addr domain.NetAddr
|
||||||
|
assert.True(t, addr.IsZero())
|
||||||
|
|
||||||
|
addr.IP = "127.0.0.1"
|
||||||
|
addr.Port = 3000
|
||||||
|
assert.False(t, addr.IsZero())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestKeyPair(t *testing.T) {
|
||||||
|
var keyPair domain.KeyPair
|
||||||
|
assert.True(t, keyPair.IsZero())
|
||||||
|
|
||||||
|
keyPair.Cert = []byte("cert")
|
||||||
|
keyPair.Key = []byte("key")
|
||||||
|
assert.False(t, keyPair.IsZero())
|
||||||
|
}
|
||||||
|
@ -8,7 +8,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
typescontainer "github.com/docker/docker/api/types/container"
|
typescontainer "github.com/docker/docker/api/types/container"
|
||||||
@ -27,14 +27,22 @@ import (
|
|||||||
type StreamKey string
|
type StreamKey string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultFetchIngressStateInterval = 5 * time.Second // default interval to fetch the state of the media server
|
defaultUpdateStateInterval = 5 * time.Second // default interval to update the state of the media server
|
||||||
defaultAPIPort = 9997 // default API host port for the media server
|
defaultAPIPort = 9997 // default API host port for the media server
|
||||||
|
defaultRTMPIP = "127.0.0.1" // default RTMP host IP, bound to localhost for security
|
||||||
defaultRTMPPort = 1935 // default RTMP host port for the media server
|
defaultRTMPPort = 1935 // default RTMP host port for the media server
|
||||||
|
defaultRTMPSPort = 1936 // default RTMPS host port for the media server
|
||||||
|
defaultHost = "localhost" // default mediaserver host name
|
||||||
defaultChanSize = 64 // default channel size for asynchronous non-error channels
|
defaultChanSize = 64 // default channel size for asynchronous non-error channels
|
||||||
imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx
|
imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx
|
||||||
defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey].
|
defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey].
|
||||||
componentName = "mediaserver" // component name, mostly used for Docker labels
|
componentName = "mediaserver" // component name, mostly used for Docker labels
|
||||||
httpClientTimeout = time.Second // timeout for outgoing HTTP client requests
|
httpClientTimeout = time.Second // timeout for outgoing HTTP client requests
|
||||||
|
configPath = "/mediamtx.yml" // path to the media server config file
|
||||||
|
tlsInternalCertPath = "/etc/tls-internal.crt" // path to the internal TLS cert
|
||||||
|
tlsInternalKeyPath = "/etc/tls-internal.key" // path to the internal TLS key
|
||||||
|
tlsCertPath = "/etc/tls.crt" // path to the custom TLS cert
|
||||||
|
tlsKeyPath = "/etc/tls.key" // path to the custom TLS key
|
||||||
)
|
)
|
||||||
|
|
||||||
// action is an action to be performed by the actor.
|
// action is an action to be performed by the actor.
|
||||||
@ -46,12 +54,15 @@ type Actor struct {
|
|||||||
stateC chan domain.Source
|
stateC chan domain.Source
|
||||||
chanSize int
|
chanSize int
|
||||||
containerClient *container.Client
|
containerClient *container.Client
|
||||||
|
rtmpAddr domain.NetAddr
|
||||||
|
rtmpsAddr domain.NetAddr
|
||||||
apiPort int
|
apiPort int
|
||||||
rtmpPort int
|
host string
|
||||||
streamKey StreamKey
|
streamKey StreamKey
|
||||||
fetchIngressStateInterval time.Duration
|
updateStateInterval time.Duration
|
||||||
pass string // password for the media server
|
pass string // password for the media server
|
||||||
tlsCert, tlsKey []byte // TLS cert and key for the media server
|
keyPairInternal domain.KeyPair // TLS key pair for the media server
|
||||||
|
keyPairCustom domain.KeyPair // TLS key pair for the media server
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
apiClient *http.Client
|
apiClient *http.Client
|
||||||
|
|
||||||
@ -62,36 +73,69 @@ type Actor struct {
|
|||||||
// NewActorParams contains the parameters for building a new media server
|
// NewActorParams contains the parameters for building a new media server
|
||||||
// actor.
|
// actor.
|
||||||
type NewActorParams struct {
|
type NewActorParams struct {
|
||||||
|
RTMPAddr OptionalNetAddr // defaults to disabled, or 127.0.0.1:1935
|
||||||
|
RTMPSAddr OptionalNetAddr // defaults to disabled, or 127.0.0.1:1936
|
||||||
APIPort int // defaults to 9997
|
APIPort int // defaults to 9997
|
||||||
RTMPPort int // defaults to 1935
|
Host string // defaults to "localhost"
|
||||||
|
TLSCertPath string // defaults to empty
|
||||||
|
TLSKeyPath string // defaults to empty
|
||||||
StreamKey StreamKey // defaults to "live"
|
StreamKey StreamKey // defaults to "live"
|
||||||
ChanSize int // defaults to 64
|
ChanSize int // defaults to 64
|
||||||
FetchIngressStateInterval time.Duration // defaults to 5 seconds
|
UpdateStateInterval time.Duration // defaults to 5 seconds
|
||||||
ContainerClient *container.Client
|
ContainerClient *container.Client
|
||||||
Logger *slog.Logger
|
Logger *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OptionalNetAddr is a wrapper around domain.NetAddr that indicates whether it
|
||||||
|
// is enabled or not.
|
||||||
|
type OptionalNetAddr struct {
|
||||||
|
domain.NetAddr
|
||||||
|
|
||||||
|
Enabled bool
|
||||||
|
}
|
||||||
|
|
||||||
// NewActor creates a new media server actor.
|
// NewActor creates a new media server actor.
|
||||||
//
|
//
|
||||||
// Callers must consume the state channel exposed via [C].
|
// Callers must consume the state channel exposed via [C].
|
||||||
func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error) {
|
func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error) {
|
||||||
tlsCert, tlsKey, err := generateTLSCert()
|
dnsNames := []string{"localhost"}
|
||||||
|
if params.Host != "" {
|
||||||
|
dnsNames = append(dnsNames, params.Host)
|
||||||
|
}
|
||||||
|
|
||||||
|
keyPairInternal, err := generateTLSCert(dnsNames...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("generate TLS cert: %w", err)
|
return nil, fmt.Errorf("generate TLS cert: %w", err)
|
||||||
}
|
}
|
||||||
apiClient, err := buildAPIClient(tlsCert)
|
|
||||||
|
var keyPairCustom domain.KeyPair
|
||||||
|
if params.TLSCertPath != "" {
|
||||||
|
keyPairCustom.Cert, err = os.ReadFile(params.TLSCertPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("read TLS cert: %w", err)
|
||||||
|
}
|
||||||
|
keyPairCustom.Key, err = os.ReadFile(params.TLSKeyPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("read TLS key: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: custom cert for API?
|
||||||
|
apiClient, err := buildAPIClient(keyPairInternal.Cert)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("build API client: %w", err)
|
return nil, fmt.Errorf("build API client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
|
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
|
||||||
return &Actor{
|
return &Actor{
|
||||||
|
rtmpAddr: toRTMPAddr(params.RTMPAddr, defaultRTMPPort),
|
||||||
|
rtmpsAddr: toRTMPAddr(params.RTMPSAddr, defaultRTMPSPort),
|
||||||
apiPort: cmp.Or(params.APIPort, defaultAPIPort),
|
apiPort: cmp.Or(params.APIPort, defaultAPIPort),
|
||||||
rtmpPort: cmp.Or(params.RTMPPort, defaultRTMPPort),
|
host: cmp.Or(params.Host, defaultHost),
|
||||||
streamKey: cmp.Or(params.StreamKey, defaultStreamKey),
|
streamKey: cmp.Or(params.StreamKey, defaultStreamKey),
|
||||||
fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval),
|
updateStateInterval: cmp.Or(params.UpdateStateInterval, defaultUpdateStateInterval),
|
||||||
tlsCert: tlsCert,
|
keyPairInternal: keyPairInternal,
|
||||||
tlsKey: tlsKey,
|
keyPairCustom: keyPairCustom,
|
||||||
pass: generatePassword(),
|
pass: generatePassword(),
|
||||||
actorC: make(chan action, chanSize),
|
actorC: make(chan action, chanSize),
|
||||||
state: new(domain.Source),
|
state: new(domain.Source),
|
||||||
@ -104,19 +148,140 @@ func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *Actor) Start(ctx context.Context) error {
|
func (a *Actor) Start(ctx context.Context) error {
|
||||||
// Exposed ports are bound to 127.0.0.1 for security.
|
var portSpecs []string
|
||||||
// TODO: configurable RTMP bind address
|
portSpecs = append(portSpecs, fmt.Sprintf("127.0.0.1:%d:9997", a.apiPort))
|
||||||
apiPortSpec := nat.Port("127.0.0.1:" + strconv.Itoa(a.apiPort) + ":9997")
|
if !a.rtmpAddr.IsZero() {
|
||||||
rtmpPortSpec := nat.Port("127.0.0.1:" + strconv.Itoa(+a.rtmpPort) + ":1935")
|
portSpecs = append(portSpecs, fmt.Sprintf("%s:%d:%d", a.rtmpAddr.IP, a.rtmpAddr.Port, 1935))
|
||||||
exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)})
|
}
|
||||||
|
if !a.rtmpsAddr.IsZero() {
|
||||||
|
portSpecs = append(portSpecs, fmt.Sprintf("%s:%d:%d", a.rtmpsAddr.IP, a.rtmpsAddr.Port, 1936))
|
||||||
|
}
|
||||||
|
exposedPorts, portBindings, err := nat.ParsePortSpecs(portSpecs)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("parse port specs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// The RTMP URL is passed to the UI via the state.
|
cfg, err := a.buildServerConfig()
|
||||||
// This could be refactored, it's not really stateful data.
|
if err != nil {
|
||||||
a.state.RTMPURL = a.RTMPURL()
|
return fmt.Errorf("build server config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
cfg, err := yaml.Marshal(
|
copyFiles := []container.CopyFileConfig{
|
||||||
|
{
|
||||||
|
Path: configPath,
|
||||||
|
Payload: bytes.NewReader(cfg),
|
||||||
|
Mode: 0600,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Path: tlsInternalCertPath,
|
||||||
|
Payload: bytes.NewReader(a.keyPairInternal.Cert),
|
||||||
|
Mode: 0600,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Path: tlsInternalKeyPath,
|
||||||
|
Payload: bytes.NewReader(a.keyPairInternal.Key),
|
||||||
|
Mode: 0600,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Path: "/etc/healthcheckopts.txt",
|
||||||
|
Payload: bytes.NewReader([]byte(fmt.Sprintf("--user api:%s", a.pass))),
|
||||||
|
Mode: 0600,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if !a.keyPairCustom.IsZero() {
|
||||||
|
copyFiles = append(
|
||||||
|
copyFiles,
|
||||||
|
container.CopyFileConfig{
|
||||||
|
Path: tlsCertPath,
|
||||||
|
Payload: bytes.NewReader(a.keyPairCustom.Cert),
|
||||||
|
Mode: 0600,
|
||||||
|
},
|
||||||
|
container.CopyFileConfig{
|
||||||
|
Path: tlsKeyPath,
|
||||||
|
Payload: bytes.NewReader(a.keyPairCustom.Key),
|
||||||
|
Mode: 0600,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
args := []any{"host", a.host}
|
||||||
|
if a.rtmpAddr.IsZero() {
|
||||||
|
args = append(args, "rtmp.enabled", false)
|
||||||
|
} else {
|
||||||
|
args = append(args, "rtmp.enabled", true, "rtmp.bind_addr", a.rtmpAddr.IP, "rtmp.bind_port", a.rtmpAddr.Port)
|
||||||
|
}
|
||||||
|
if a.rtmpsAddr.IsZero() {
|
||||||
|
args = append(args, "rtmps.enabled", false)
|
||||||
|
} else {
|
||||||
|
args = append(args, "rtmps.enabled", true, "rtmps.bind_addr", a.rtmpsAddr.IP, "rtmps.bind_port", a.rtmpsAddr.Port)
|
||||||
|
}
|
||||||
|
a.logger.Info("Starting media server", args...)
|
||||||
|
|
||||||
|
containerStateC, errC := a.containerClient.RunContainer(
|
||||||
|
ctx,
|
||||||
|
container.RunContainerParams{
|
||||||
|
Name: componentName,
|
||||||
|
ChanSize: a.chanSize,
|
||||||
|
ContainerConfig: &typescontainer.Config{
|
||||||
|
Image: imageNameMediaMTX,
|
||||||
|
Hostname: "mediaserver",
|
||||||
|
Labels: map[string]string{container.LabelComponent: componentName},
|
||||||
|
Healthcheck: &typescontainer.HealthConfig{
|
||||||
|
Test: []string{
|
||||||
|
"CMD",
|
||||||
|
"curl",
|
||||||
|
"--fail",
|
||||||
|
"--silent",
|
||||||
|
"--cacert", "/etc/tls-internal.crt",
|
||||||
|
"--config", "/etc/healthcheckopts.txt",
|
||||||
|
a.healthCheckURL(),
|
||||||
|
},
|
||||||
|
Interval: time.Second * 10,
|
||||||
|
StartPeriod: time.Second * 2,
|
||||||
|
StartInterval: time.Second * 2,
|
||||||
|
Retries: 2,
|
||||||
|
},
|
||||||
|
ExposedPorts: exposedPorts,
|
||||||
|
},
|
||||||
|
HostConfig: &typescontainer.HostConfig{
|
||||||
|
NetworkMode: "default",
|
||||||
|
PortBindings: portBindings,
|
||||||
|
},
|
||||||
|
NetworkCountConfig: container.NetworkCountConfig{Rx: "eth0", Tx: "eth1"},
|
||||||
|
Logs: container.LogConfig{Stdout: true},
|
||||||
|
CopyFiles: copyFiles,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
go a.actorLoop(ctx, containerStateC, errC)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Actor) buildServerConfig() ([]byte, error) {
|
||||||
|
// NOTE: Regardless of the user configuration (which mostly affects exposed
|
||||||
|
// ports and UI rendering) plain RTMP must be enabled at the container level,
|
||||||
|
// for internal connections.
|
||||||
|
var encryptionString string
|
||||||
|
if a.rtmpsAddr.IsZero() {
|
||||||
|
encryptionString = "no"
|
||||||
|
} else {
|
||||||
|
encryptionString = "optional"
|
||||||
|
}
|
||||||
|
|
||||||
|
var certPath, keyPath string
|
||||||
|
if a.keyPairCustom.IsZero() {
|
||||||
|
certPath = tlsInternalCertPath
|
||||||
|
keyPath = tlsInternalKeyPath
|
||||||
|
} else {
|
||||||
|
certPath = tlsCertPath
|
||||||
|
keyPath = tlsKeyPath
|
||||||
|
}
|
||||||
|
|
||||||
|
return yaml.Marshal(
|
||||||
Config{
|
Config{
|
||||||
LogLevel: "info",
|
LogLevel: "debug",
|
||||||
LogDestinations: []string{"stdout"},
|
LogDestinations: []string{"stdout"},
|
||||||
AuthMethod: "internal",
|
AuthMethod: "internal",
|
||||||
AuthInternalUsers: []User{
|
AuthInternalUsers: []User{
|
||||||
@ -142,79 +307,21 @@ func (a *Actor) Start(ctx context.Context) error {
|
|||||||
Permissions: []UserPermission{{Action: "api"}},
|
Permissions: []UserPermission{{Action: "api"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
RTMP: true,
|
||||||
|
RTMPEncryption: encryptionString,
|
||||||
|
RTMPAddress: ":1935",
|
||||||
|
RTMPSAddress: ":1936",
|
||||||
|
RTMPServerCert: certPath,
|
||||||
|
RTMPServerKey: keyPath,
|
||||||
API: true,
|
API: true,
|
||||||
APIEncryption: true,
|
APIEncryption: true,
|
||||||
APIServerCert: "/etc/tls.crt",
|
APIServerCert: tlsInternalCertPath,
|
||||||
APIServerKey: "/etc/tls.key",
|
APIServerKey: tlsInternalKeyPath,
|
||||||
Paths: map[string]Path{
|
Paths: map[string]Path{
|
||||||
string(a.streamKey): {Source: "publisher"},
|
string(a.streamKey): {Source: "publisher"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil { // should never happen
|
|
||||||
return fmt.Errorf("marshal config: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
containerStateC, errC := a.containerClient.RunContainer(
|
|
||||||
ctx,
|
|
||||||
container.RunContainerParams{
|
|
||||||
Name: componentName,
|
|
||||||
ChanSize: a.chanSize,
|
|
||||||
ContainerConfig: &typescontainer.Config{
|
|
||||||
Image: imageNameMediaMTX,
|
|
||||||
Hostname: "mediaserver",
|
|
||||||
Labels: map[string]string{container.LabelComponent: componentName},
|
|
||||||
Healthcheck: &typescontainer.HealthConfig{
|
|
||||||
Test: []string{
|
|
||||||
"CMD",
|
|
||||||
"curl",
|
|
||||||
"--fail",
|
|
||||||
"--silent",
|
|
||||||
"--cacert", "/etc/tls.crt",
|
|
||||||
"--config", "/etc/healthcheckopts.txt",
|
|
||||||
a.healthCheckURL(),
|
|
||||||
},
|
|
||||||
Interval: time.Second * 10,
|
|
||||||
StartPeriod: time.Second * 2,
|
|
||||||
StartInterval: time.Second * 2,
|
|
||||||
Retries: 2,
|
|
||||||
},
|
|
||||||
ExposedPorts: exposedPorts,
|
|
||||||
},
|
|
||||||
HostConfig: &typescontainer.HostConfig{
|
|
||||||
NetworkMode: "default",
|
|
||||||
PortBindings: portBindings,
|
|
||||||
},
|
|
||||||
NetworkCountConfig: container.NetworkCountConfig{Rx: "eth0", Tx: "eth1"},
|
|
||||||
Logs: container.LogConfig{Stdout: true},
|
|
||||||
CopyFiles: []container.CopyFileConfig{
|
|
||||||
{
|
|
||||||
Path: "/mediamtx.yml",
|
|
||||||
Payload: bytes.NewReader(cfg),
|
|
||||||
Mode: 0600,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Path: "/etc/tls.crt",
|
|
||||||
Payload: bytes.NewReader(a.tlsCert),
|
|
||||||
Mode: 0600,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Path: "/etc/tls.key",
|
|
||||||
Payload: bytes.NewReader(a.tlsKey),
|
|
||||||
Mode: 0600,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Path: "/etc/healthcheckopts.txt",
|
|
||||||
Payload: bytes.NewReader([]byte(fmt.Sprintf("--user api:%s", a.pass))),
|
|
||||||
Mode: 0600,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
go a.actorLoop(ctx, containerStateC, errC)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// C returns a channel that will receive the current state of the media server.
|
// C returns a channel that will receive the current state of the media server.
|
||||||
@ -248,16 +355,8 @@ func (s *Actor) Close() error {
|
|||||||
// actorLoop is the main loop of the media server actor. It exits when the
|
// actorLoop is the main loop of the media server actor. It exits when the
|
||||||
// actor is closed, or the parent context is cancelled.
|
// actor is closed, or the parent context is cancelled.
|
||||||
func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Container, errC <-chan error) {
|
func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Container, errC <-chan error) {
|
||||||
fetchStateT := time.NewTicker(s.fetchIngressStateInterval)
|
updateStateT := time.NewTicker(s.updateStateInterval)
|
||||||
defer fetchStateT.Stop()
|
defer updateStateT.Stop()
|
||||||
|
|
||||||
// fetchTracksT is used to signal that tracks should be fetched from the
|
|
||||||
// media server, after the stream goes on-air. A short delay is needed due to
|
|
||||||
// workaround a race condition in the media server.
|
|
||||||
var fetchTracksT *time.Timer
|
|
||||||
resetFetchTracksT := func(d time.Duration) { fetchTracksT = time.NewTimer(d) }
|
|
||||||
resetFetchTracksT(time.Second)
|
|
||||||
fetchTracksT.Stop()
|
|
||||||
|
|
||||||
sendState := func() { s.stateC <- *s.state }
|
sendState := func() { s.stateC <- *s.state }
|
||||||
|
|
||||||
@ -267,7 +366,7 @@ func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Con
|
|||||||
s.state.Container = containerState
|
s.state.Container = containerState
|
||||||
|
|
||||||
if s.state.Container.Status == domain.ContainerStatusExited {
|
if s.state.Container.Status == domain.ContainerStatusExited {
|
||||||
fetchStateT.Stop()
|
updateStateT.Stop()
|
||||||
s.handleContainerExit(nil)
|
s.handleContainerExit(nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -286,43 +385,21 @@ func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Con
|
|||||||
s.logger.Error("Error from container client", "err", err, "id", shortID(s.state.Container.ID))
|
s.logger.Error("Error from container client", "err", err, "id", shortID(s.state.Container.ID))
|
||||||
}
|
}
|
||||||
|
|
||||||
fetchStateT.Stop()
|
updateStateT.Stop()
|
||||||
s.handleContainerExit(err)
|
s.handleContainerExit(err)
|
||||||
|
|
||||||
sendState()
|
sendState()
|
||||||
case <-fetchStateT.C:
|
case <-updateStateT.C:
|
||||||
ingressState, err := fetchIngressState(s.rtmpConnsURL(), s.streamKey, s.apiClient)
|
path, err := fetchPath(s.pathURL(string(s.streamKey)), s.apiClient)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Error("Error fetching server state", "err", err)
|
s.logger.Error("Error fetching path", "err", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var shouldSendState bool
|
if path.Ready != s.state.Live {
|
||||||
if ingressState.ready != s.state.Live {
|
s.state.Live = path.Ready
|
||||||
s.state.Live = ingressState.ready
|
|
||||||
s.state.LiveChangedAt = time.Now()
|
s.state.LiveChangedAt = time.Now()
|
||||||
resetFetchTracksT(time.Second)
|
s.state.Tracks = path.Tracks
|
||||||
shouldSendState = true
|
|
||||||
}
|
|
||||||
if ingressState.listeners != s.state.Listeners {
|
|
||||||
s.state.Listeners = ingressState.listeners
|
|
||||||
shouldSendState = true
|
|
||||||
}
|
|
||||||
if shouldSendState {
|
|
||||||
sendState()
|
|
||||||
}
|
|
||||||
case <-fetchTracksT.C:
|
|
||||||
if !s.state.Live {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if tracks, err := fetchTracks(s.pathsURL(), s.streamKey, s.apiClient); err != nil {
|
|
||||||
s.logger.Error("Error fetching tracks", "err", err)
|
|
||||||
resetFetchTracksT(3 * time.Second)
|
|
||||||
} else if len(tracks) == 0 {
|
|
||||||
resetFetchTracksT(time.Second)
|
|
||||||
} else {
|
|
||||||
s.state.Tracks = tracks
|
|
||||||
sendState()
|
sendState()
|
||||||
}
|
}
|
||||||
case action, ok := <-s.actorC:
|
case action, ok := <-s.actorC:
|
||||||
@ -352,7 +429,20 @@ func (s *Actor) handleContainerExit(err error) {
|
|||||||
|
|
||||||
// RTMPURL returns the RTMP URL for the media server, accessible from the host.
|
// RTMPURL returns the RTMP URL for the media server, accessible from the host.
|
||||||
func (s *Actor) RTMPURL() string {
|
func (s *Actor) RTMPURL() string {
|
||||||
return fmt.Sprintf("rtmp://localhost:%d/%s", s.rtmpPort, s.streamKey)
|
if s.rtmpAddr.IsZero() {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("rtmp://%s:%d/%s", s.host, s.rtmpAddr.Port, s.streamKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RTMPSURL returns the RTMPS URL for the media server, accessible from the host.
|
||||||
|
func (s *Actor) RTMPSURL() string {
|
||||||
|
if s.rtmpsAddr.IsZero() {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("rtmps://%s:%d/%s", s.host, s.rtmpsAddr.Port, s.streamKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RTMPInternalURL returns the RTMP URL for the media server, accessible from
|
// RTMPInternalURL returns the RTMP URL for the media server, accessible from
|
||||||
@ -362,15 +452,9 @@ func (s *Actor) RTMPInternalURL() string {
|
|||||||
return fmt.Sprintf("rtmp://mediaserver:1935/%s?user=api&pass=%s", s.streamKey, s.pass)
|
return fmt.Sprintf("rtmp://mediaserver:1935/%s?user=api&pass=%s", s.streamKey, s.pass)
|
||||||
}
|
}
|
||||||
|
|
||||||
// rtmpConnsURL returns the URL for fetching RTMP connections, accessible from
|
// pathURL returns the URL for fetching a path, accessible from the host.
|
||||||
// the host.
|
func (s *Actor) pathURL(path string) string {
|
||||||
func (s *Actor) rtmpConnsURL() string {
|
return fmt.Sprintf("https://api:%s@localhost:%d/v3/paths/get/%s", s.pass, s.apiPort, path)
|
||||||
return fmt.Sprintf("https://api:%s@localhost:%d/v3/rtmpconns/list", s.pass, s.apiPort)
|
|
||||||
}
|
|
||||||
|
|
||||||
// pathsURL returns the URL for fetching paths, accessible from the host.
|
|
||||||
func (s *Actor) pathsURL() string {
|
|
||||||
return fmt.Sprintf("https://api:%s@localhost:%d/v3/paths/list", s.pass, s.apiPort)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// healthCheckURL returns the URL for the health check, accessible from the
|
// healthCheckURL returns the URL for the health check, accessible from the
|
||||||
@ -396,3 +480,17 @@ func generatePassword() string {
|
|||||||
_, _ = rand.Read(p)
|
_, _ = rand.Read(p)
|
||||||
return fmt.Sprintf("%x", []byte(p))
|
return fmt.Sprintf("%x", []byte(p))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// toRTMPAddr builds a domain.NetAddr from an OptionalNetAddr, with default
|
||||||
|
// values set to RTMP default bind config if needed. If the OptionalNetAddr is
|
||||||
|
// not enabled, a zero value is returned.
|
||||||
|
func toRTMPAddr(a OptionalNetAddr, defaultPort int) domain.NetAddr {
|
||||||
|
if !a.Enabled {
|
||||||
|
return domain.NetAddr{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return domain.NetAddr{
|
||||||
|
IP: cmp.Or(a.IP, defaultRTMPIP),
|
||||||
|
Port: cmp.Or(a.Port, defaultPort),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -8,7 +8,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type httpClient interface {
|
type httpClient interface {
|
||||||
@ -44,109 +43,37 @@ func buildAPIClient(certPEM []byte) (*http.Client, error) {
|
|||||||
|
|
||||||
const userAgent = "octoplex-client"
|
const userAgent = "octoplex-client"
|
||||||
|
|
||||||
type apiResponse[T any] struct {
|
type apiPath struct {
|
||||||
Items []T `json:"items"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type rtmpConnsResponse struct {
|
|
||||||
ID string `json:"id"`
|
|
||||||
CreatedAt time.Time `json:"created"`
|
|
||||||
State string `json:"state"`
|
|
||||||
Path string `json:"path"`
|
|
||||||
BytesReceived int64 `json:"bytesReceived"`
|
|
||||||
BytesSent int64 `json:"bytesSent"`
|
|
||||||
RemoteAddr string `json:"remoteAddr"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type ingressStreamState struct {
|
|
||||||
ready bool
|
|
||||||
listeners int
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: handle pagination
|
|
||||||
func fetchIngressState(apiURL string, streamKey StreamKey, httpClient httpClient) (state ingressStreamState, _ error) {
|
|
||||||
req, err := http.NewRequest(http.MethodGet, apiURL, nil)
|
|
||||||
if err != nil {
|
|
||||||
return state, fmt.Errorf("new request: %w", err)
|
|
||||||
}
|
|
||||||
req.Header.Set("User-Agent", userAgent)
|
|
||||||
|
|
||||||
httpResp, err := httpClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return state, fmt.Errorf("do request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if httpResp.StatusCode != http.StatusOK {
|
|
||||||
return state, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
respBody, err := io.ReadAll(httpResp.Body)
|
|
||||||
if err != nil {
|
|
||||||
return state, fmt.Errorf("read body: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var resp apiResponse[rtmpConnsResponse]
|
|
||||||
if err = json.Unmarshal(respBody, &resp); err != nil {
|
|
||||||
return state, fmt.Errorf("unmarshal: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, conn := range resp.Items {
|
|
||||||
if conn.Path != string(streamKey) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
switch conn.State {
|
|
||||||
case "publish":
|
|
||||||
// mediamtx may report a stream as being in publish state via the API,
|
|
||||||
// but still refuse to serve them due to being unpublished. This seems to
|
|
||||||
// be a bug, this is a hacky workaround.
|
|
||||||
state.ready = conn.BytesReceived > 20_000
|
|
||||||
case "read":
|
|
||||||
state.listeners++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return state, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type path struct {
|
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
|
Ready bool `json:"ready"`
|
||||||
Tracks []string `json:"tracks"`
|
Tracks []string `json:"tracks"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: handle pagination
|
func fetchPath(apiURL string, httpClient httpClient) (apiPath, error) {
|
||||||
func fetchTracks(apiURL string, streamKey StreamKey, httpClient httpClient) ([]string, error) {
|
|
||||||
req, err := http.NewRequest(http.MethodGet, apiURL, nil)
|
req, err := http.NewRequest(http.MethodGet, apiURL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("new request: %w", err)
|
return apiPath{}, fmt.Errorf("new request: %w", err)
|
||||||
}
|
}
|
||||||
req.Header.Set("User-Agent", userAgent)
|
req.Header.Set("User-Agent", userAgent)
|
||||||
|
|
||||||
httpResp, err := httpClient.Do(req)
|
httpResp, err := httpClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("do request: %w", err)
|
return apiPath{}, fmt.Errorf("do request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if httpResp.StatusCode != http.StatusOK {
|
if httpResp.StatusCode != http.StatusOK {
|
||||||
return nil, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode)
|
return apiPath{}, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
respBody, err := io.ReadAll(httpResp.Body)
|
respBody, err := io.ReadAll(httpResp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("read body: %w", err)
|
return apiPath{}, fmt.Errorf("read body: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp apiResponse[path]
|
var path apiPath
|
||||||
if err = json.Unmarshal(respBody, &resp); err != nil {
|
if err = json.Unmarshal(respBody, &path); err != nil {
|
||||||
return nil, fmt.Errorf("unmarshal: %w", err)
|
return apiPath{}, fmt.Errorf("unmarshal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var tracks []string
|
return path, nil
|
||||||
for _, path := range resp.Items {
|
|
||||||
if path.Name == string(streamKey) {
|
|
||||||
tracks = path.Tracks
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return tracks, nil
|
|
||||||
}
|
}
|
||||||
|
@ -12,14 +12,14 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFetchIngressState(t *testing.T) {
|
func TestFetchPath(t *testing.T) {
|
||||||
const url = "http://localhost:8989/v3/rtmpconns/list"
|
const url = "http://localhost:8989/v3/paths/get/live"
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
name string
|
name string
|
||||||
httpResponse *http.Response
|
httpResponse *http.Response
|
||||||
httpError error
|
httpError error
|
||||||
wantState ingressStreamState
|
wantPath apiPath
|
||||||
wantErr error
|
wantErr error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -36,36 +36,20 @@ func TestFetchIngressState(t *testing.T) {
|
|||||||
wantErr: errors.New("unmarshal: invalid character 'i' looking for beginning of value"),
|
wantErr: errors.New("unmarshal: invalid character 'i' looking for beginning of value"),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "successful response, no streams",
|
name: "successful response, not ready",
|
||||||
httpResponse: &http.Response{
|
httpResponse: &http.Response{
|
||||||
StatusCode: http.StatusOK,
|
StatusCode: http.StatusOK,
|
||||||
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":0,"pageCount":0,"items":[]}`))),
|
Body: io.NopCloser(bytes.NewReader([]byte(`{"name":"live","confName":"live","source":null,"ready":false,"readyTime":null,"tracks":[],"bytesReceived":0,"bytesSent":0,"readers":[]}`))),
|
||||||
},
|
},
|
||||||
wantState: ingressStreamState{ready: false, listeners: 0},
|
wantPath: apiPath{Name: "live", Ready: false, Tracks: []string{}},
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "successful response, not yet ready",
|
|
||||||
httpResponse: &http.Response{
|
|
||||||
StatusCode: http.StatusOK,
|
|
||||||
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"id":"d2953cf8-9cd6-4c30-816f-807b80b6a71f","created":"2025-02-15T08:19:00.616220354Z","remoteAddr":"172.17.0.1:32972","state":"publish","path":"live","query":"","bytesReceived":15462,"bytesSent":3467}]}`))),
|
|
||||||
},
|
|
||||||
wantState: ingressStreamState{ready: false, listeners: 0},
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "successful response, ready",
|
name: "successful response, ready",
|
||||||
httpResponse: &http.Response{
|
httpResponse: &http.Response{
|
||||||
StatusCode: http.StatusOK,
|
StatusCode: http.StatusOK,
|
||||||
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"id":"d2953cf8-9cd6-4c30-816f-807b80b6a71f","created":"2025-02-15T08:19:00.616220354Z","remoteAddr":"172.17.0.1:32972","state":"publish","path":"live","query":"","bytesReceived":27832,"bytesSent":3467}]}`))),
|
Body: io.NopCloser(bytes.NewReader([]byte(`{"name":"live","confName":"live","source":{"type":"rtmpConn","id":"fd2d79a8-bab9-4141-a1b5-55bd1a8649df"},"ready":true,"readyTime":"2025-04-18T07:44:53.683627506Z","tracks":["H264"],"bytesReceived":254677,"bytesSent":0,"readers":[]}`))),
|
||||||
},
|
},
|
||||||
wantState: ingressStreamState{ready: true, listeners: 0},
|
wantPath: apiPath{Name: "live", Ready: true, Tracks: []string{"H264"}},
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "successful response, ready, with listeners",
|
|
||||||
httpResponse: &http.Response{
|
|
||||||
StatusCode: http.StatusOK,
|
|
||||||
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":2,"pageCount":1,"items":[{"id":"12668315-0572-41f1-8384-fe7047cc73be","created":"2025-02-15T08:23:43.836589664Z","remoteAddr":"172.17.0.1:40026","state":"publish","path":"live","query":"","bytesReceived":7180753,"bytesSent":3467},{"id":"079370fd-43bb-4798-b079-860cc3159e4e","created":"2025-02-15T08:24:32.396794364Z","remoteAddr":"192.168.48.3:44736","state":"read","path":"live","query":"","bytesReceived":333435,"bytesSent":24243}]}`))),
|
|
||||||
},
|
|
||||||
wantState: ingressStreamState{ready: true, listeners: 1},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -79,74 +63,12 @@ func TestFetchIngressState(t *testing.T) {
|
|||||||
})).
|
})).
|
||||||
Return(tc.httpResponse, tc.httpError)
|
Return(tc.httpResponse, tc.httpError)
|
||||||
|
|
||||||
state, err := fetchIngressState(url, StreamKey("live"), &httpClient)
|
path, err := fetchPath(url, &httpClient)
|
||||||
if tc.wantErr != nil {
|
if tc.wantErr != nil {
|
||||||
require.EqualError(t, err, tc.wantErr.Error())
|
require.EqualError(t, err, tc.wantErr.Error())
|
||||||
} else {
|
} else {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, tc.wantState, state)
|
require.Equal(t, tc.wantPath, path)
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFetchTracks(t *testing.T) {
|
|
||||||
const url = "http://localhost:8989/v3/paths/list"
|
|
||||||
|
|
||||||
testCases := []struct {
|
|
||||||
name string
|
|
||||||
httpResponse *http.Response
|
|
||||||
httpError error
|
|
||||||
wantTracks []string
|
|
||||||
wantErr error
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "non-200 status",
|
|
||||||
httpResponse: &http.Response{StatusCode: http.StatusNotFound},
|
|
||||||
wantErr: errors.New("unexpected status code: 404"),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "unparseable response",
|
|
||||||
httpResponse: &http.Response{
|
|
||||||
StatusCode: http.StatusOK,
|
|
||||||
Body: io.NopCloser(bytes.NewReader([]byte("invalid json"))),
|
|
||||||
},
|
|
||||||
wantErr: errors.New("unmarshal: invalid character 'i' looking for beginning of value"),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "successful response, no tracks",
|
|
||||||
httpResponse: &http.Response{
|
|
||||||
StatusCode: http.StatusOK,
|
|
||||||
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"name":"live","confName":"all_others","source":{"type":"rtmpConn","id":"287340b2-04c2-4fcc-ab9c-089f4ff15aeb"},"ready":true,"readyTime":"2025-02-22T17:26:05.527206818Z","tracks":[],"bytesReceived":94430983,"bytesSent":0,"readers":[]}]}`))),
|
|
||||||
},
|
|
||||||
wantTracks: []string{},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "successful response, tracks",
|
|
||||||
httpResponse: &http.Response{
|
|
||||||
StatusCode: http.StatusOK,
|
|
||||||
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"name":"live","confName":"all_others","source":{"type":"rtmpConn","id":"287340b2-04c2-4fcc-ab9c-089f4ff15aeb"},"ready":true,"readyTime":"2025-02-22T17:26:05.527206818Z","tracks":["H264","MPEG-4 Audio"],"bytesReceived":94430983,"bytesSent":0,"readers":[]}]}`))),
|
|
||||||
},
|
|
||||||
wantTracks: []string{"H264", "MPEG-4 Audio"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
var httpClient mocks.HTTPClient
|
|
||||||
httpClient.
|
|
||||||
EXPECT().
|
|
||||||
Do(mock.MatchedBy(func(req *http.Request) bool {
|
|
||||||
return req.URL.String() == url && req.Method == http.MethodGet
|
|
||||||
})).
|
|
||||||
Return(tc.httpResponse, tc.httpError)
|
|
||||||
|
|
||||||
tracks, err := fetchTracks(url, StreamKey("live"), &httpClient)
|
|
||||||
if tc.wantErr != nil {
|
|
||||||
require.EqualError(t, err, tc.wantErr.Error())
|
|
||||||
} else {
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, tc.wantTracks, tracks)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -17,8 +17,12 @@ type Config struct {
|
|||||||
APIEncryption bool `yaml:"apiEncryption,omitempty"`
|
APIEncryption bool `yaml:"apiEncryption,omitempty"`
|
||||||
APIServerCert string `yaml:"apiServerCert,omitempty"`
|
APIServerCert string `yaml:"apiServerCert,omitempty"`
|
||||||
APIServerKey string `yaml:"apiServerKey,omitempty"`
|
APIServerKey string `yaml:"apiServerKey,omitempty"`
|
||||||
RTMP bool `yaml:"rtmp,omitempty"`
|
RTMP bool `yaml:"rtmp"`
|
||||||
|
RTMPEncryption string `yaml:"rtmpEncryption,omitempty"`
|
||||||
RTMPAddress string `yaml:"rtmpAddress,omitempty"`
|
RTMPAddress string `yaml:"rtmpAddress,omitempty"`
|
||||||
|
RTMPSAddress string `yaml:"rtmpsAddress,omitempty"`
|
||||||
|
RTMPServerCert string `yaml:"rtmpServerCert,omitempty"`
|
||||||
|
RTMPServerKey string `yaml:"rtmpServerKey,omitempty"`
|
||||||
HLS bool `yaml:"hls"`
|
HLS bool `yaml:"hls"`
|
||||||
RTSP bool `yaml:"rtsp"`
|
RTSP bool `yaml:"rtsp"`
|
||||||
WebRTC bool `yaml:"webrtc"`
|
WebRTC bool `yaml:"webrtc"`
|
||||||
|
@ -10,23 +10,20 @@ import (
|
|||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
"math/big"
|
"math/big"
|
||||||
"time"
|
"time"
|
||||||
)
|
|
||||||
|
|
||||||
type (
|
"git.netflux.io/rob/octoplex/internal/domain"
|
||||||
tlsCert []byte
|
|
||||||
tlsKey []byte
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// generateTLSCert generates a self-signed TLS certificate and private key.
|
// generateTLSCert generates a self-signed TLS certificate and private key.
|
||||||
func generateTLSCert() (tlsCert, tlsKey, error) {
|
func generateTLSCert(dnsNames ...string) (domain.KeyPair, error) {
|
||||||
privKey, err := ecdsa.GenerateKey(elliptic.P384(), rand.Reader)
|
privKey, err := ecdsa.GenerateKey(elliptic.P384(), rand.Reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return domain.KeyPair{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
serialNumber, err := rand.Int(rand.Reader, new(big.Int).Lsh(big.NewInt(1), 128))
|
serialNumber, err := rand.Int(rand.Reader, new(big.Int).Lsh(big.NewInt(1), 128))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return domain.KeyPair{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
@ -40,28 +37,31 @@ func generateTLSCert() (tlsCert, tlsKey, error) {
|
|||||||
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
|
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
|
||||||
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
|
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
|
||||||
BasicConstraintsValid: true,
|
BasicConstraintsValid: true,
|
||||||
DNSNames: []string{"localhost"},
|
DNSNames: dnsNames,
|
||||||
}
|
}
|
||||||
|
|
||||||
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &privKey.PublicKey, privKey)
|
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &privKey.PublicKey, privKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return domain.KeyPair{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var certPEM, keyPEM bytes.Buffer
|
var certPEM, keyPEM bytes.Buffer
|
||||||
|
|
||||||
if err = pem.Encode(&certPEM, &pem.Block{Type: "CERTIFICATE", Bytes: certDER}); err != nil {
|
if err = pem.Encode(&certPEM, &pem.Block{Type: "CERTIFICATE", Bytes: certDER}); err != nil {
|
||||||
return nil, nil, err
|
return domain.KeyPair{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
privKeyDER, err := x509.MarshalECPrivateKey(privKey)
|
privKeyDER, err := x509.MarshalECPrivateKey(privKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return domain.KeyPair{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := pem.Encode(&keyPEM, &pem.Block{Type: "EC PRIVATE KEY", Bytes: privKeyDER}); err != nil {
|
if err := pem.Encode(&keyPEM, &pem.Block{Type: "EC PRIVATE KEY", Bytes: privKeyDER}); err != nil {
|
||||||
return nil, nil, err
|
return domain.KeyPair{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return certPEM.Bytes(), keyPEM.Bytes(), nil
|
return domain.KeyPair{
|
||||||
|
Cert: certPEM.Bytes(),
|
||||||
|
Key: keyPEM.Bytes(),
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -12,12 +12,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestGenerateTLSCert(t *testing.T) {
|
func TestGenerateTLSCert(t *testing.T) {
|
||||||
certPEM, keyPEM, err := generateTLSCert()
|
keyPair, err := generateTLSCert("localhost", "rtmp.example.com")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotEmpty(t, certPEM)
|
require.NotEmpty(t, keyPair.Cert)
|
||||||
require.NotEmpty(t, keyPEM)
|
require.NotEmpty(t, keyPair.Key)
|
||||||
|
|
||||||
block, _ := pem.Decode(certPEM)
|
block, _ := pem.Decode(keyPair.Cert)
|
||||||
require.NotNil(t, block, "failed to decode certificate PEM")
|
require.NotNil(t, block, "failed to decode certificate PEM")
|
||||||
|
|
||||||
cert, err := x509.ParseCertificate(block.Bytes)
|
cert, err := x509.ParseCertificate(block.Bytes)
|
||||||
@ -33,8 +33,10 @@ func TestGenerateTLSCert(t *testing.T) {
|
|||||||
assert.True(t, cert.BasicConstraintsValid, "basic constraints should be valid")
|
assert.True(t, cert.BasicConstraintsValid, "basic constraints should be valid")
|
||||||
assert.Contains(t, cert.ExtKeyUsage, x509.ExtKeyUsageServerAuth)
|
assert.Contains(t, cert.ExtKeyUsage, x509.ExtKeyUsageServerAuth)
|
||||||
assert.Contains(t, cert.ExtKeyUsage, x509.ExtKeyUsageClientAuth)
|
assert.Contains(t, cert.ExtKeyUsage, x509.ExtKeyUsageClientAuth)
|
||||||
|
assert.Contains(t, cert.DNSNames, "localhost", "DNS names should include localhost")
|
||||||
|
assert.Contains(t, cert.DNSNames, "rtmp.example.com", "DNS names should include rtmp.example.com")
|
||||||
|
|
||||||
block, _ = pem.Decode(keyPEM)
|
block, _ = pem.Decode(keyPair.Key)
|
||||||
require.NotNil(t, block, "failed to decode private key PEM")
|
require.NotNil(t, block, "failed to decode private key PEM")
|
||||||
|
|
||||||
privKey, err := x509.ParseECPrivateKey(block.Bytes)
|
privKey, err := x509.ParseECPrivateKey(block.Bytes)
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type sourceViews struct {
|
type sourceViews struct {
|
||||||
url *tview.TextView
|
|
||||||
status *tview.TextView
|
status *tview.TextView
|
||||||
tracks *tview.TextView
|
tracks *tview.TextView
|
||||||
health *tview.TextView
|
health *tview.TextView
|
||||||
@ -41,9 +40,10 @@ const (
|
|||||||
|
|
||||||
// UI is responsible for managing the terminal user interface.
|
// UI is responsible for managing the terminal user interface.
|
||||||
type UI struct {
|
type UI struct {
|
||||||
commandC chan Command
|
commandC chan domain.Command
|
||||||
clipboardAvailable bool
|
clipboardAvailable bool
|
||||||
configFilePath string
|
configFilePath string
|
||||||
|
rtmpURL, rtmpsURL string
|
||||||
buildInfo domain.BuildInfo
|
buildInfo domain.BuildInfo
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
|
|
||||||
@ -57,6 +57,7 @@ type UI struct {
|
|||||||
sourceViews sourceViews
|
sourceViews sourceViews
|
||||||
destView *tview.Table
|
destView *tview.Table
|
||||||
noDestView *tview.TextView
|
noDestView *tview.TextView
|
||||||
|
aboutView *tview.Flex
|
||||||
pullProgressModal *tview.Modal
|
pullProgressModal *tview.Modal
|
||||||
|
|
||||||
// other mutable state
|
// other mutable state
|
||||||
@ -105,7 +106,7 @@ const defaultChanSize = 64
|
|||||||
// StartUI starts the terminal user interface.
|
// StartUI starts the terminal user interface.
|
||||||
func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
||||||
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
|
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
|
||||||
commandCh := make(chan Command, chanSize)
|
commandCh := make(chan domain.Command, chanSize)
|
||||||
|
|
||||||
app := tview.NewApplication()
|
app := tview.NewApplication()
|
||||||
|
|
||||||
@ -127,8 +128,8 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
|||||||
sourceView := tview.NewFlex()
|
sourceView := tview.NewFlex()
|
||||||
sourceView.SetDirection(tview.FlexColumn)
|
sourceView.SetDirection(tview.FlexColumn)
|
||||||
sourceView.SetBorder(true)
|
sourceView.SetBorder(true)
|
||||||
sourceView.SetTitle("Source RTMP server")
|
sourceView.SetTitle("Source")
|
||||||
sidebar.AddItem(sourceView, 9, 0, false)
|
sidebar.AddItem(sourceView, 8, 0, false)
|
||||||
|
|
||||||
leftCol := tview.NewFlex()
|
leftCol := tview.NewFlex()
|
||||||
leftCol.SetDirection(tview.FlexRow)
|
leftCol.SetDirection(tview.FlexRow)
|
||||||
@ -137,11 +138,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
|||||||
sourceView.AddItem(leftCol, 9, 0, false)
|
sourceView.AddItem(leftCol, 9, 0, false)
|
||||||
sourceView.AddItem(rightCol, 0, 1, false)
|
sourceView.AddItem(rightCol, 0, 1, false)
|
||||||
|
|
||||||
urlHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerURL)
|
|
||||||
leftCol.AddItem(urlHeaderTextView, 1, 0, false)
|
|
||||||
urlTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash)
|
|
||||||
rightCol.AddItem(urlTextView, 1, 0, false)
|
|
||||||
|
|
||||||
statusHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerStatus)
|
statusHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerStatus)
|
||||||
leftCol.AddItem(statusHeaderTextView, 1, 0, false)
|
leftCol.AddItem(statusHeaderTextView, 1, 0, false)
|
||||||
statusTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash)
|
statusTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash)
|
||||||
@ -176,13 +172,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
|||||||
aboutView.SetDirection(tview.FlexRow)
|
aboutView.SetDirection(tview.FlexRow)
|
||||||
aboutView.SetBorder(true)
|
aboutView.SetBorder(true)
|
||||||
aboutView.SetTitle("Actions")
|
aboutView.SetTitle("Actions")
|
||||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]a[-] Add destination"), 1, 0, false)
|
|
||||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Del[-] Remove destination"), 1, 0, false)
|
|
||||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Space[-] Start/stop destination"), 1, 0, false)
|
|
||||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText(""), 1, 0, false)
|
|
||||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]u[-] Copy source RTMP URL"), 1, 0, false)
|
|
||||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false)
|
|
||||||
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
|
|
||||||
|
|
||||||
sidebar.AddItem(aboutView, 0, 1, false)
|
sidebar.AddItem(aboutView, 0, 1, false)
|
||||||
|
|
||||||
@ -232,7 +221,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
|||||||
pages: pages,
|
pages: pages,
|
||||||
container: container,
|
container: container,
|
||||||
sourceViews: sourceViews{
|
sourceViews: sourceViews{
|
||||||
url: urlTextView,
|
|
||||||
status: statusTextView,
|
status: statusTextView,
|
||||||
tracks: tracksTextView,
|
tracks: tracksTextView,
|
||||||
health: healthTextView,
|
health: healthTextView,
|
||||||
@ -242,6 +230,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
|||||||
},
|
},
|
||||||
destView: destView,
|
destView: destView,
|
||||||
noDestView: noDestView,
|
noDestView: noDestView,
|
||||||
|
aboutView: aboutView,
|
||||||
pullProgressModal: pullProgressModal,
|
pullProgressModal: pullProgressModal,
|
||||||
urlsToStartState: make(map[string]startState),
|
urlsToStartState: make(map[string]startState),
|
||||||
}
|
}
|
||||||
@ -254,8 +243,32 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
|||||||
return ui, nil
|
return ui, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ui *UI) renderAboutView() {
|
||||||
|
ui.aboutView.Clear()
|
||||||
|
|
||||||
|
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]a[-] Add destination"), 1, 0, false)
|
||||||
|
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Del[-] Remove destination"), 1, 0, false)
|
||||||
|
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Space[-] Start/stop destination"), 1, 0, false)
|
||||||
|
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText(""), 1, 0, false)
|
||||||
|
|
||||||
|
i := 1
|
||||||
|
if ui.rtmpURL != "" {
|
||||||
|
rtmpURLView := tview.NewTextView().SetDynamicColors(true).SetText(fmt.Sprintf("[grey]F%d[-] Copy source RTMP URL", i))
|
||||||
|
ui.aboutView.AddItem(rtmpURLView, 1, 0, false)
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
|
||||||
|
if ui.rtmpsURL != "" {
|
||||||
|
rtmpsURLView := tview.NewTextView().SetDynamicColors(true).SetText(fmt.Sprintf("[grey]F%d[-] Copy source RTMPS URL", i))
|
||||||
|
ui.aboutView.AddItem(rtmpsURLView, 1, 0, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false)
|
||||||
|
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
|
||||||
|
}
|
||||||
|
|
||||||
// C returns a channel that receives commands from the user interface.
|
// C returns a channel that receives commands from the user interface.
|
||||||
func (ui *UI) C() <-chan Command {
|
func (ui *UI) C() <-chan domain.Command {
|
||||||
return ui.commandC
|
return ui.commandC
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -283,6 +296,17 @@ func (ui *UI) run(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetRTMPURLs sets the RTMP and RTMPS URLs for the user interface, which are
|
||||||
|
// unavailable when the UI is first created.
|
||||||
|
func (ui *UI) SetRTMPURLs(rtmpURL, rtmpsURL string) {
|
||||||
|
ui.mu.Lock()
|
||||||
|
ui.rtmpURL = rtmpURL
|
||||||
|
ui.rtmpsURL = rtmpsURL
|
||||||
|
ui.mu.Unlock()
|
||||||
|
|
||||||
|
ui.renderAboutView()
|
||||||
|
}
|
||||||
|
|
||||||
func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
||||||
// Special case: handle CTRL-C even when a modal is visible.
|
// Special case: handle CTRL-C even when a modal is visible.
|
||||||
if event.Key() == tcell.KeyCtrlC {
|
if event.Key() == tcell.KeyCtrlC {
|
||||||
@ -309,8 +333,6 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
|||||||
return nil
|
return nil
|
||||||
case ' ':
|
case ' ':
|
||||||
ui.toggleDestination()
|
ui.toggleDestination()
|
||||||
case 'u', 'U':
|
|
||||||
ui.copySourceURLToClipboard(ui.clipboardAvailable)
|
|
||||||
case 'c', 'C':
|
case 'c', 'C':
|
||||||
ui.copyConfigFilePathToClipboard(ui.clipboardAvailable, ui.configFilePath)
|
ui.copyConfigFilePathToClipboard(ui.clipboardAvailable, ui.configFilePath)
|
||||||
case '?':
|
case '?':
|
||||||
@ -318,6 +340,8 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
|||||||
case 'k': // tview vim bindings
|
case 'k': // tview vim bindings
|
||||||
handleKeyUp()
|
handleKeyUp()
|
||||||
}
|
}
|
||||||
|
case tcell.KeyF1, tcell.KeyF2:
|
||||||
|
ui.fkeyHandler(event.Key())
|
||||||
case tcell.KeyDelete, tcell.KeyBackspace, tcell.KeyBackspace2:
|
case tcell.KeyDelete, tcell.KeyBackspace, tcell.KeyBackspace2:
|
||||||
ui.removeDestination()
|
ui.removeDestination()
|
||||||
return nil
|
return nil
|
||||||
@ -328,11 +352,34 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
|||||||
return event
|
return event
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ui *UI) fkeyHandler(key tcell.Key) {
|
||||||
|
var urls []string
|
||||||
|
if ui.rtmpURL != "" {
|
||||||
|
urls = append(urls, ui.rtmpURL)
|
||||||
|
}
|
||||||
|
if ui.rtmpsURL != "" {
|
||||||
|
urls = append(urls, ui.rtmpsURL)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch key {
|
||||||
|
case tcell.KeyF1:
|
||||||
|
if len(urls) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ui.copySourceURLToClipboard(urls[0])
|
||||||
|
case tcell.KeyF2:
|
||||||
|
if len(urls) < 2 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ui.copySourceURLToClipboard(urls[1])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (ui *UI) ShowSourceNotLiveModal() {
|
func (ui *UI) ShowSourceNotLiveModal() {
|
||||||
ui.app.QueueUpdateDraw(func() {
|
ui.app.QueueUpdateDraw(func() {
|
||||||
ui.showModal(
|
ui.showModal(
|
||||||
pageNameModalStartupCheck,
|
pageNameModalNotLive,
|
||||||
fmt.Sprintf("Waiting for stream.\nStart streaming to the source URL then try again:\n\n%s", ui.sourceViews.url.GetText(true)),
|
"Waiting for stream.\n\nStart streaming to a source URL then try again.",
|
||||||
[]string{"Ok"},
|
[]string{"Ok"},
|
||||||
false,
|
false,
|
||||||
nil,
|
nil,
|
||||||
@ -397,7 +444,7 @@ func (ui *UI) ShowFatalErrorModal(errString string) {
|
|||||||
[]string{"Quit"},
|
[]string{"Quit"},
|
||||||
false,
|
false,
|
||||||
func(int, string) {
|
func(int, string) {
|
||||||
ui.commandC <- CommandQuit{}
|
ui.commandC <- domain.CommandQuit{}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
@ -519,6 +566,7 @@ func (ui *UI) updateProgressModal(container domain.Container) {
|
|||||||
const (
|
const (
|
||||||
pageNameMain = "main"
|
pageNameMain = "main"
|
||||||
pageNameAddDestination = "add-destination"
|
pageNameAddDestination = "add-destination"
|
||||||
|
pageNameViewURLs = "view-urls"
|
||||||
pageNameConfigUpdateFailed = "modal-config-update-failed"
|
pageNameConfigUpdateFailed = "modal-config-update-failed"
|
||||||
pageNameNoDestinations = "no-destinations"
|
pageNameNoDestinations = "no-destinations"
|
||||||
pageNameModalAbout = "modal-about"
|
pageNameModalAbout = "modal-about"
|
||||||
@ -530,6 +578,7 @@ const (
|
|||||||
pageNameModalRemoveDestination = "modal-remove-destination"
|
pageNameModalRemoveDestination = "modal-remove-destination"
|
||||||
pageNameModalSourceError = "modal-source-error"
|
pageNameModalSourceError = "modal-source-error"
|
||||||
pageNameModalStartupCheck = "modal-startup-check"
|
pageNameModalStartupCheck = "modal-startup-check"
|
||||||
|
pageNameModalNotLive = "modal-not-live"
|
||||||
)
|
)
|
||||||
|
|
||||||
// modalVisible returns true if any modal, including the add destination form,
|
// modalVisible returns true if any modal, including the add destination form,
|
||||||
@ -648,7 +697,7 @@ func (ui *UI) handleMediaServerClosed(exitReason string) {
|
|||||||
SetBackgroundColor(tcell.ColorBlack).
|
SetBackgroundColor(tcell.ColorBlack).
|
||||||
SetTextColor(tcell.ColorWhite).
|
SetTextColor(tcell.ColorWhite).
|
||||||
SetDoneFunc(func(int, string) {
|
SetDoneFunc(func(int, string) {
|
||||||
ui.commandC <- CommandQuit{}
|
ui.commandC <- domain.CommandQuit{}
|
||||||
})
|
})
|
||||||
modal.SetBorderStyle(tcell.StyleDefault.Background(tcell.ColorBlack).Foreground(tcell.ColorWhite))
|
modal.SetBorderStyle(tcell.StyleDefault.Background(tcell.ColorBlack).Foreground(tcell.ColorWhite))
|
||||||
|
|
||||||
@ -696,8 +745,6 @@ func (ui *UI) redrawFromState(state domain.AppState) {
|
|||||||
SetSelectable(false)
|
SetSelectable(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
ui.sourceViews.url.SetText(state.Source.RTMPURL)
|
|
||||||
|
|
||||||
tracks := dash
|
tracks := dash
|
||||||
if state.Source.Live && len(state.Source.Tracks) > 0 {
|
if state.Source.Live && len(state.Source.Tracks) > 0 {
|
||||||
tracks = strings.Join(state.Source.Tracks, ", ")
|
tracks = strings.Join(state.Source.Tracks, ", ")
|
||||||
@ -840,7 +887,7 @@ func (ui *UI) addDestination() {
|
|||||||
AddInputField(inputLabelName, "My stream", inputLen, nil, nil).
|
AddInputField(inputLabelName, "My stream", inputLen, nil, nil).
|
||||||
AddInputField(inputLabelURL, "rtmp://", inputLen, nil, nil).
|
AddInputField(inputLabelURL, "rtmp://", inputLen, nil, nil).
|
||||||
AddButton("Add", func() {
|
AddButton("Add", func() {
|
||||||
ui.commandC <- CommandAddDestination{
|
ui.commandC <- domain.CommandAddDestination{
|
||||||
DestinationName: form.GetFormItemByLabel(inputLabelName).(*tview.InputField).GetText(),
|
DestinationName: form.GetFormItemByLabel(inputLabelName).(*tview.InputField).GetText(),
|
||||||
URL: form.GetFormItemByLabel(inputLabelURL).(*tview.InputField).GetText(),
|
URL: form.GetFormItemByLabel(inputLabelURL).(*tview.InputField).GetText(),
|
||||||
}
|
}
|
||||||
@ -898,7 +945,7 @@ func (ui *UI) removeDestination() {
|
|||||||
false,
|
false,
|
||||||
func(buttonIndex int, _ string) {
|
func(buttonIndex int, _ string) {
|
||||||
if buttonIndex == 0 {
|
if buttonIndex == 0 {
|
||||||
ui.commandC <- CommandRemoveDestination{URL: url}
|
ui.commandC <- domain.CommandRemoveDestination{URL: url}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
@ -962,22 +1009,21 @@ func (ui *UI) toggleDestination() {
|
|||||||
switch ss {
|
switch ss {
|
||||||
case startStateNotStarted:
|
case startStateNotStarted:
|
||||||
ui.urlsToStartState[url] = startStateStarting
|
ui.urlsToStartState[url] = startStateStarting
|
||||||
ui.commandC <- CommandStartDestination{URL: url}
|
ui.commandC <- domain.CommandStartDestination{URL: url}
|
||||||
case startStateStarting:
|
case startStateStarting:
|
||||||
// do nothing
|
// do nothing
|
||||||
return
|
return
|
||||||
case startStateStarted:
|
case startStateStarted:
|
||||||
ui.commandC <- CommandStopDestination{URL: url}
|
ui.commandC <- domain.CommandStopDestination{URL: url}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ui *UI) copySourceURLToClipboard(clipboardAvailable bool) {
|
func (ui *UI) copySourceURLToClipboard(url string) {
|
||||||
var text string
|
var text string
|
||||||
|
|
||||||
url := ui.sourceViews.url.GetText(true)
|
if ui.clipboardAvailable {
|
||||||
if clipboardAvailable {
|
|
||||||
clipboard.Write(clipboard.FmtText, []byte(url))
|
clipboard.Write(clipboard.FmtText, []byte(url))
|
||||||
text = "Source URL copied to clipboard:\n\n" + url
|
text = "URL copied to clipboard:\n\n" + url
|
||||||
} else {
|
} else {
|
||||||
text = "Copy to clipboard not available:\n\n" + url
|
text = "Copy to clipboard not available:\n\n" + url
|
||||||
}
|
}
|
||||||
@ -1021,7 +1067,7 @@ func (ui *UI) confirmQuit() {
|
|||||||
false,
|
false,
|
||||||
func(buttonIndex int, _ string) {
|
func(buttonIndex int, _ string) {
|
||||||
if buttonIndex == 0 {
|
if buttonIndex == 0 {
|
||||||
ui.commandC <- CommandQuit{}
|
ui.commandC <- domain.CommandQuit{}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
@ -29,6 +29,12 @@ dir = "{{cwd}}"
|
|||||||
run = "golangci-lint run"
|
run = "golangci-lint run"
|
||||||
alias = "l"
|
alias = "l"
|
||||||
|
|
||||||
|
[tasks.fmt]
|
||||||
|
description = "Run formatter"
|
||||||
|
dir = "{{cwd}}"
|
||||||
|
run = "goimports -w ."
|
||||||
|
alias = "f"
|
||||||
|
|
||||||
[tasks.generate_mocks]
|
[tasks.generate_mocks]
|
||||||
description = "Generate mocks"
|
description = "Generate mocks"
|
||||||
dir = "{{cwd}}"
|
dir = "{{cwd}}"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user