Compare commits

..

1 Commits

Author SHA1 Message Date
d4c4db84ca build: set default permissions
Some checks failed
ci-build / lint (push) Has been cancelled
ci-build / build (push) Has been cancelled
ci-build / release (push) Has been cancelled
2025-04-17 06:26:05 +02:00
35 changed files with 1002 additions and 2021 deletions

View File

@ -1,4 +1,4 @@
name: build
name: ci-build
run-name: Building ${{ github.ref_name }}
on:
push:
@ -87,4 +87,3 @@ jobs:
args: release --clean
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
HOMEBREW_TOKEN: ${{ secrets.HOMEBREW_TAP_GITHUB_TOKEN }}

View File

@ -1,89 +0,0 @@
name: codeql
on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
schedule:
- cron: '36 23 * * 3'
jobs:
analyze:
name: Analyze (${{ matrix.language }})
# Runner size impacts CodeQL analysis time. To learn more, please see:
# - https://gh.io/recommended-hardware-resources-for-running-codeql
# - https://gh.io/supported-runners-and-hardware-resources
# - https://gh.io/using-larger-runners (GitHub.com only)
# Consider using larger runners or machines with greater resources for possible analysis time improvements.
runs-on: ${{ (matrix.language == 'swift' && 'macos-latest') || 'ubuntu-latest' }}
permissions:
# required for all workflows
security-events: write
# required to fetch internal or private CodeQL packs
packages: read
# only required for workflows in private repositories
actions: read
contents: read
strategy:
fail-fast: false
matrix:
include:
- language: actions
build-mode: none
- language: go
build-mode: autobuild
# CodeQL supports the following values keywords for 'language': 'actions', 'c-cpp', 'csharp', 'go', 'java-kotlin', 'javascript-typescript', 'python', 'ruby', 'swift'
# Use `c-cpp` to analyze code written in C, C++ or both
# Use 'java-kotlin' to analyze code written in Java, Kotlin or both
# Use 'javascript-typescript' to analyze code written in JavaScript, TypeScript or both
# To learn more about changing the languages that are analyzed or customizing the build mode for your analysis,
# see https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/customizing-your-advanced-setup-for-code-scanning.
# If you are analyzing a compiled language, you can modify the 'build-mode' for that language to customize how
# your codebase is analyzed, see https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/codeql-code-scanning-for-compiled-languages
steps:
- name: Checkout repository
uses: actions/checkout@v4
# Add any setup steps before running the `github/codeql-action/init` action.
# This includes steps like installing compilers or runtimes (`actions/setup-node`
# or others). This is typically only required for manual builds.
# - name: Setup runtime (example)
# uses: actions/setup-example@v1
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
build-mode: ${{ matrix.build-mode }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# For more details on CodeQL's query packs, refer to: https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
# queries: security-extended,security-and-quality
# If the analyze step fails for one of the languages you are analyzing with
# "We were unable to automatically build your code", modify the matrix above
# to set the build mode to "manual" for that language. Then modify this step
# to build your code.
# Command-line programs to run using the OS shell.
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun
- if: matrix.build-mode == 'manual'
shell: bash
run: |
echo 'If you are using a "manual" build mode for one or more of the' \
'languages you are analyzing, replace this with the commands to build' \
'your code, for example:'
echo ' make bootstrap'
echo ' make release'
exit 1
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3
with:
category: "/language:${{matrix.language}}"

View File

@ -33,32 +33,18 @@ brews:
repository:
owner: rfwatson
name: homebrew-octoplex
token: "{{ .Env.HOMEBREW_TOKEN }}"
install: |
bin.install "octoplex"
test: |
system "#{bin}/octoplex -h"
release:
draft: true
github:
owner: rfwatson
name: octoplex
changelog:
use: github
groups:
- title: New Features
regexp: '^.*?feat(\([[:word:]]+\))??!?:.+$'
order: 0
- title: "Bug fixes"
regexp: '^.*?fix(\([[:word:]]+\))??!?:.+$'
order: 1
- title: "Refactorings"
regexp: '^.*?refactor(\([[:word:]]+\))??!?:.+$'
order: 2
- title: Others
order: 999
filters:
exclude:
- "^doc:"

View File

@ -1,44 +0,0 @@
# Contributing
Thanks for contributing to Octoplex!
## Development
### Mise
Octoplex uses [mise](https://mise.jdx.dev/installing-mise.html) as a task
runner and environment management tool.
Once installed, you can run common development tasks easily:
Command|Shortcut|Description
---|---|---
`mise run test`|`mise run t`|Run unit tests
`mise run test_integration`|`mise run ti`|Run integration tests
`mise run lint`|`mise run l`|Run linter
`mise run format`|`mise run f`|Run formatter
`mise run generate_mocks`|`mise run m`|Re-generate mocks
### Tests
#### Integration tests
The integration tests (mostly in `/internal/app/integration_test.go`) attempt
to exercise the entire app, including launching containers and rendering the
terminal output.
Sometimes they can be flaky. Always ensure there are no stale Docker containers
present from previous runs, and that nothing is listening or attempting to
broadcast to localhost:1935 or localhost:1936.
## Opening a pull request
Pull requests are welcome, please propose significant changes in a
[discussion](https://github.com/rfwatson/octoplex/discussions) first.
1. Fork the repo
1. Make your changes, including test coverage
1. Run the formatter (`mise run format`)
1. Push the changes to a branch
1. Ensure the branch is passing
1. Open a pull request

View File

@ -1,19 +1,17 @@
# Octoplex :octopus:
[![build status](https://github.com/rfwatson/octoplex/actions/workflows/build.yml/badge.svg)](https://github.com/rfwatson/octoplex/actions/workflows/build.yml)
[![scan status](https://github.com/rfwatson/octoplex/actions/workflows/codeql.yml/badge.svg)](https://github.com/rfwatson/octoplex/actions/workflows/codeql.yml)
![build status](https://github.com/rfwatson/octoplex/actions/workflows/ci-build.yml/badge.svg)
![GitHub Release](https://img.shields.io/github/v/release/rfwatson/octoplex)
[![Go Report Card](https://goreportcard.com/badge/git.netflux.io/rob/octoplex)](https://goreportcard.com/report/git.netflux.io/rob/octoplex)
[![License: AGPL v3](https://img.shields.io/badge/License-AGPL_v3-blue.svg)](https://www.gnu.org/licenses/agpl-3.0)
Octoplex is a Docker-native live video restreamer.
Octoplex is a live video restreamer for the terminal.
* Restream RTMP/RTMPS to unlimited destinations
* Broadcast using OBS or any standard tool
* Add and remove destinations on-the-fly
* Automatic reconnections on drop
* Terminal UI with live metrics and health status
* Powered by FFmpeg, Docker & other open source tools
* Restream RTMP to unlimited destinations
* Broadcast using OBS and other standard tools
* Add and remove destinations while streaming
* Automatic reconnections
* Terminal user interface with real-time container metrics and health status
* Built on FFmpeg, Docker and other proven free software
## How it works
@ -39,7 +37,8 @@ Octoplex is a Docker-native live video restreamer.
### Docker Engine
First, ensure that Docker Engine is installed.
First, make sure Docker Engine is installed. Octoplex uses Docker to manage
FFmpeg and other streaming tools.
Linux: See https://docs.docker.com/engine/install/.
@ -70,43 +69,11 @@ Launch the `octoplex` binary.
$ octoplex
```
### Restreaming with OBS
### Connecting with OBS
#### RTMP
To connect with OBS, configure it to stream to `rtmp://localhost:1935/live`.
Use the following OBS stream configuration:
![OBS streaming settings for RTMP](/assets/obs1.png)
#### RTMPS
Or to connect with RTMPS:
![OBS streaming settings for RTMPS](/assets/obs2.png)
:warning: Warning: OBS may not accept selfsigned certificates.
If you see the error
> "The RTMP server sent an invalid SSL certificate."
then either install a CAsigned cert for your RTMPS host, or import your
selfsigned cert into your OSs trusted store. See the
[configuration](#Configuration) section below.
### Restreaming with FFmpeg
#### RTMP
```
$ ffmpeg -i input.mp4 -c copy -f flv rtmp://localhost:1935/live
```
#### RTMPS
```
$ ffmpeg -i input.mp4 -c copy -f flv rtmps://localhost:1936/live
```
![OBS streaming settings](/assets/obs1.png)
### Subcommands
@ -118,7 +85,7 @@ None|Launch the terminal user interface
`version`|Print the version
`help`|Print help screen
### Configuration
### Configuration file
Octoplex stores configuration state in a simple YAML file. (See [above](#subcommands) for its location.)
@ -129,20 +96,9 @@ logfile:
enabled: true # defaults to false
path: /path/to/logfile # defaults to $XDG_STATE_HOME/octoplex/octoplex.log
sources:
mediaServer:
rtmp:
enabled: true # must be true
streamKey: live # defaults to "live"
host: rtmp.example.com # defaults to "localhost"
tls: # optional TLS settings; RTMPS support is automatic.
cert: /etc/mycert.pem # If you omit cert/key, a self-signed keypair will be
key: /etc/mykey.pem # generated using the `host` value above.
rtmp:
enabled: true # defaults to false
ip: 127.0.0.1 # defaults to 127.0.0.1
port: 1935 # defaults to 1935
rtmps:
enabled: true # defaults to false
ip: 0.0.0.0 # defaults to 127.0.0.1
port: 1936 # defaults to 1936
destinations:
- name: YouTube # Destination name, used only for display
url: rtmp://rtmp.youtube.com/12345 # Destination URL with stream key
@ -151,21 +107,19 @@ destinations:
# other destinations here
```
:information_source: It is also possible to add and remove destinations directly from the
:warning: It is also possible to add and remove destinations directly from the
terminal user interface.
:warning: `sources.mediaServer.rtmp.ip` must be set to a valid IP address if
you want to accept connections from other hosts. Leave it blank to bind only to
localhost (`127.0.0.1`) or use `0.0.0.0` to bind to all network interfaces.
## Contributing
See [CONTRIBUTING.md](/CONTRIBUTING.md).
### Bug reports
Open bug reports [on GitHub](https://github.com/rfwatson/octoplex/issues/new).
### Pull requests
Pull requests are welcome.
## Acknowledgements
Octoplex is built on and/or makes use of other free and open source software,

View File

@ -1,11 +0,0 @@
# Security
## Supported versions
Octoplex is currently alpha software. Security updates will be targeted to the
`main` branch.
## Reporting a vulnerability
Please report any vulnerability privately through GitHub:
https://github.com/rfwatson/octoplex/security

Binary file not shown.

Before

Width:  |  Height:  |  Size: 38 KiB

31
go.mod
View File

@ -3,12 +3,12 @@ module git.netflux.io/rob/octoplex
go 1.24.0
require (
github.com/docker/docker v28.1.1+incompatible
github.com/docker/docker v28.0.1+incompatible
github.com/docker/go-connections v0.5.0
github.com/gdamore/tcell/v2 v2.8.1
github.com/google/go-cmp v0.7.0
github.com/opencontainers/image-spec v1.1.1
github.com/rivo/tview v0.0.0-20250330220935-949945f8d922
github.com/rivo/tview v0.0.0-20241227133733-17b7edb88c57
github.com/stretchr/testify v1.10.0
github.com/testcontainers/testcontainers-go v0.35.0
golang.design/x/clipboard v0.7.0
@ -17,6 +17,7 @@ require (
require (
dario.cat/mergo v1.0.0 // indirect
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 // indirect
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
@ -40,7 +41,7 @@ require (
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jinzhu/copier v0.4.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.9 // indirect
@ -50,11 +51,9 @@ require (
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/go-archive v0.1.0 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
github.com/moby/sys/atomicwriter v0.1.0 // indirect
github.com/moby/sys/sequential v0.6.0 // indirect
github.com/moby/sys/user v0.4.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/sys/user v0.1.0 // indirect
github.com/moby/sys/userns v0.1.0 // indirect
github.com/moby/term v0.5.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
@ -89,18 +88,18 @@ require (
go.opentelemetry.io/otel/metric v1.35.0 // indirect
go.opentelemetry.io/otel/trace v1.35.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac // indirect
golang.org/x/exp/shiny v0.0.0-20250408133849-7e4ce0ab07d0 // indirect
golang.org/x/image v0.26.0 // indirect
golang.org/x/mobile v0.0.0-20250408133729-978277e7eaf7 // indirect
golang.org/x/exp/shiny v0.0.0-20250305212735-054e65f0b394 // indirect
golang.org/x/image v0.25.0 // indirect
golang.org/x/mobile v0.0.0-20250305212854-3a7bc9f8a4de // indirect
golang.org/x/mod v0.24.0 // indirect
golang.org/x/sync v0.13.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/term v0.31.0 // indirect
golang.org/x/text v0.24.0 // indirect
golang.org/x/sync v0.12.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/term v0.30.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/time v0.9.0 // indirect
golang.org/x/tools v0.32.0 // indirect
golang.org/x/tools v0.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

68
go.sum
View File

@ -26,8 +26,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/docker/docker v28.1.1+incompatible h1:49M11BFLsVO1gxY9UX9p/zwkE/rswggs8AdFmXQw51I=
github.com/docker/docker v28.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v28.0.1+incompatible h1:FCHjSRdXhNRFjlHMTv4jUNlIBbTeRjrWfeFuJp7jpo0=
github.com/docker/docker v28.0.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
@ -73,8 +73,8 @@ github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@ -100,16 +100,12 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/go-archive v0.1.0 h1:Kk/5rdW/g+H8NHdJW2gsXyZ7UnzvJNOy6VKJqueWdcQ=
github.com/moby/go-archive v0.1.0/go.mod h1:G9B+YoujNohJmrIYFBpSd54GTUB4lt9S+xVQvsJyFuo=
github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk=
github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc=
github.com/moby/sys/atomicwriter v0.1.0 h1:kw5D/EqkBwsBFi0ss9v1VG3wIkVhzGvLklJ+w3A14Sw=
github.com/moby/sys/atomicwriter v0.1.0/go.mod h1:Ul8oqv2ZMNHOceF643P6FKPXeCmYtlQMvpizfsSoaWs=
github.com/moby/sys/sequential v0.6.0 h1:qrx7XFUd/5DxtqcoH1h438hF5TmOvzC/lspjy7zgvCU=
github.com/moby/sys/sequential v0.6.0/go.mod h1:uyv8EUTrca5PnDsdMGXhZe6CCe8U/UiTWd+lL+7b/Ko=
github.com/moby/sys/user v0.4.0 h1:jhcMKit7SA80hivmFJcbB1vqmw//wU61Zdui2eQXuMs=
github.com/moby/sys/user v0.4.0/go.mod h1:bG+tYYYJgaMtRKgEmuueC0hJEAZWwtIbZTB+85uoHjs=
github.com/moby/sys/sequential v0.5.0 h1:OPvI35Lzn9K04PBbCLW0g4LcFAJgHsvXsRyewg5lXtc=
github.com/moby/sys/sequential v0.5.0/go.mod h1:tH2cOOs5V9MlPiXcQzRC+eEyab644PWKGRYaaV5ZZlo=
github.com/moby/sys/user v0.1.0 h1:WmZ93f5Ux6het5iituh9x2zAG7NFY9Aqi49jjE1PaQg=
github.com/moby/sys/user v0.1.0/go.mod h1:fKJhFOnsCN6xZ5gSfbM6zaHGgDJMrqt9/reuj4T7MmU=
github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g=
github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28=
github.com/moby/term v0.5.2 h1:6qk3FJAFDs6i/q3W/pQ97SX192qKfZgGjCQqfCJkgzQ=
@ -129,8 +125,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/rivo/tview v0.0.0-20250330220935-949945f8d922 h1:SMyqkaRfpE8ZQUSRTZKO3uN84xov++OGa+e3NCksaQw=
github.com/rivo/tview v0.0.0-20250330220935-949945f8d922/go.mod h1:02iFIz7K/A9jGCvrizLPvoqr4cEIx7q54RH5Qudkrss=
github.com/rivo/tview v0.0.0-20241227133733-17b7edb88c57 h1:LmsF7Fk5jyEDhJk0fYIqdWNuTxSyid2W42A0L2YWjGE=
github.com/rivo/tview v0.0.0-20241227133733-17b7edb88c57/go.mod h1:02iFIz7K/A9jGCvrizLPvoqr4cEIx7q54RH5Qudkrss=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
@ -223,16 +219,16 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac h1:l5+whBCLH3iH2ZNHYLbAe58bo7yrN4mVcnkHDYz5vvs=
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac/go.mod h1:hH+7mtFmImwwcMvScyxUhjuVHR3HGaDPMn9rMSUUbxo=
golang.org/x/exp/shiny v0.0.0-20250408133849-7e4ce0ab07d0 h1:tMSqXTK+AQdW3LpCbfatHSRPHeW6+2WuxaVQuHftn80=
golang.org/x/exp/shiny v0.0.0-20250408133849-7e4ce0ab07d0/go.mod h1:ygj7T6vSGhhm/9yTpOQQNvuAUFziTH7RUiH74EoE2C8=
golang.org/x/image v0.26.0 h1:4XjIFEZWQmCZi6Wv8BoxsDhRU3RVnLX04dToTDAEPlY=
golang.org/x/image v0.26.0/go.mod h1:lcxbMFAovzpnJxzXS3nyL83K27tmqtKzIJpctK8YO5c=
golang.org/x/mobile v0.0.0-20250408133729-978277e7eaf7 h1:8MGTx39304caZ/OMsjPfuxUoDGI2tRas92F5x97tIYc=
golang.org/x/mobile v0.0.0-20250408133729-978277e7eaf7/go.mod h1:ftACcHgQ7vaOnQbHOHvXt9Y6bEPHrs5Ovk67ClwrPJA=
golang.org/x/exp/shiny v0.0.0-20250305212735-054e65f0b394 h1:bFYqOIMdeiCEdzPJkLiOoMDzW/v3tjW4AA/RmUZYsL8=
golang.org/x/exp/shiny v0.0.0-20250305212735-054e65f0b394/go.mod h1:ygj7T6vSGhhm/9yTpOQQNvuAUFziTH7RUiH74EoE2C8=
golang.org/x/image v0.25.0 h1:Y6uW6rH1y5y/LK1J8BPWZtr6yZ7hrsy6hFrXjgsc2fQ=
golang.org/x/image v0.25.0/go.mod h1:tCAmOEGthTtkalusGp1g3xa2gke8J6c2N565dTyl9Rs=
golang.org/x/mobile v0.0.0-20250305212854-3a7bc9f8a4de h1:WuckfUoaRGJfaQTPZvlmcaQwg4Xj9oS2cvvh3dUqpDo=
golang.org/x/mobile v0.0.0-20250305212854-3a7bc9f8a4de/go.mod h1:/IZuixag1ELW37+FftdmIt59/3esqpAWM/QqWtf7HUI=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
@ -253,8 +249,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c=
golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -264,8 +260,8 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -287,8 +283,8 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@ -298,8 +294,8 @@ golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek=
golang.org/x/term v0.31.0 h1:erwDkOK1Msy6offm1mOgvspSkslFnIGsFnxOKoufg3o=
golang.org/x/term v0.31.0/go.mod h1:R4BeIy7D95HzImkxGkTW1UQTtP54tio2RyHz7PwK0aw=
golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y=
golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
@ -309,8 +305,8 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@ -321,8 +317,8 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/tools v0.32.0 h1:Q7N1vhpkQv7ybVzLFtTjvQya2ewbwNDZzUgfXGqtMWU=
golang.org/x/tools v0.32.0/go.mod h1:ZxrU41P/wAbZD8EDa6dDCa6XfpkhJ7HFMjHJXfBDu8s=
golang.org/x/tools v0.31.0 h1:0EedkvKDbh+qistFTd0Bcwe/YLh4vHwWEkiI0toFIBU=
golang.org/x/tools v0.31.0/go.mod h1:naFTU+Cev749tSJRXJlna0T3WxKvb1kWEx15xA4SdmQ=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -344,5 +340,5 @@ gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q=
gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=

View File

@ -1,7 +1,6 @@
package app
import (
"cmp"
"context"
"errors"
"fmt"
@ -12,87 +11,50 @@ import (
"git.netflux.io/rob/octoplex/internal/config"
"git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
"git.netflux.io/rob/octoplex/internal/mediaserver"
"git.netflux.io/rob/octoplex/internal/replicator"
"git.netflux.io/rob/octoplex/internal/terminal"
"github.com/docker/docker/client"
)
// App is an instance of the app.
type App struct {
cfg config.Config
configService *config.Service
eventBus *event.Bus
dispatchC chan event.Command
dockerClient container.DockerClient
screen *terminal.Screen // Screen may be nil.
headless bool
clipboardAvailable bool
configFilePath string
buildInfo domain.BuildInfo
logger *slog.Logger
}
// Params holds the parameters for running the application.
type Params struct {
// RunParams holds the parameters for running the application.
type RunParams struct {
ConfigService *config.Service
DockerClient container.DockerClient
ChanSize int
Screen *terminal.Screen // Screen may be nil.
Headless bool
ClipboardAvailable bool
ConfigFilePath string
BuildInfo domain.BuildInfo
Logger *slog.Logger
}
// defaultChanSize is the default size of the dispatch channel.
const defaultChanSize = 64
// New creates a new application instance.
func New(params Params) *App {
return &App{
cfg: params.ConfigService.Current(),
configService: params.ConfigService,
eventBus: event.NewBus(params.Logger.With("component", "event_bus")),
dispatchC: make(chan event.Command, cmp.Or(params.ChanSize, defaultChanSize)),
dockerClient: params.DockerClient,
screen: params.Screen,
headless: params.Headless,
clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath,
buildInfo: params.BuildInfo,
logger: params.Logger,
}
}
// Run starts the application, and blocks until it exits.
func (a *App) Run(ctx context.Context) error {
func Run(ctx context.Context, params RunParams) error {
// cfg is the current configuration of the application, as reflected in the
// config file.
cfg := params.ConfigService.Current()
// state is the current state of the application, as reflected in the UI.
state := new(domain.AppState)
applyConfig(a.cfg, state)
applyConfig(cfg, state)
// Ensure there is at least one active source.
if !a.cfg.Sources.MediaServer.RTMP.Enabled && !a.cfg.Sources.MediaServer.RTMPS.Enabled {
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
// While RTMP is the only source, it doesn't make sense to disable it.
if !cfg.Sources.RTMP.Enabled {
return errors.New("config: sources.rtmp.enabled must be set to true")
}
if !a.headless {
ui, err := terminal.StartUI(ctx, terminal.StartParams{
EventBus: a.eventBus,
Dispatcher: func(cmd event.Command) { a.dispatchC <- cmd },
Screen: a.screen,
ClipboardAvailable: a.clipboardAvailable,
ConfigFilePath: a.configFilePath,
BuildInfo: a.buildInfo,
Logger: a.logger.With("component", "ui"),
})
if err != nil {
return fmt.Errorf("start terminal user interface: %w", err)
}
defer ui.Close()
logger := params.Logger
ui, err := terminal.StartUI(ctx, terminal.StartParams{
Screen: params.Screen,
ClipboardAvailable: params.ClipboardAvailable,
ConfigFilePath: params.ConfigFilePath,
BuildInfo: params.BuildInfo,
Logger: logger.With("component", "ui"),
})
if err != nil {
return fmt.Errorf("start terminal user interface: %w", err)
}
defer ui.Close()
// emptyUI is a dummy function that sets the UI state to an empty state, and
// re-renders the screen.
@ -103,64 +65,39 @@ func (a *App) Run(ctx context.Context) error {
// It is only needed for integration tests when rendering modals before the
// main loop starts. It would be nice to remove this but the risk/impact on
// non-test code is pretty low.
emptyUI := func() {
a.eventBus.Send(event.AppStateChangedEvent{State: domain.AppState{}})
}
emptyUI := func() { ui.SetState(domain.AppState{}) }
// doFatalError publishes a fatal error to the event bus, waiting for the
// user to acknowledge it if not in headless mode.
doFatalError := func(msg string) {
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg})
if a.headless {
return
}
emptyUI()
<-a.dispatchC
}
containerClient, err := container.NewClient(ctx, a.dockerClient, a.logger.With("component", "container_client"))
containerClient, err := container.NewClient(ctx, params.DockerClient, logger.With("component", "container_client"))
if err != nil {
err = fmt.Errorf("create container client: %w", err)
var msg string
var errString string
if client.IsErrConnectionFailed(err) {
msg = "Could not connect to Docker. Is Docker installed and running?"
errString = "Could not connect to Docker. Is Docker installed and running?"
} else {
msg = err.Error()
errString = err.Error()
}
doFatalError(msg)
ui.ShowFatalErrorModal(errString)
emptyUI()
<-ui.C()
return err
}
defer containerClient.Close()
updateUI := func() {
// The state is mutable so can't be passed into another goroutine
// without cloning it first.
a.eventBus.Send(event.AppStateChangedEvent{State: state.Clone()})
}
updateUI := func() { ui.SetState(*state) }
updateUI()
var tlsCertPath, tlsKeyPath string
if a.cfg.Sources.MediaServer.TLS != nil {
tlsCertPath = a.cfg.Sources.MediaServer.TLS.CertPath
tlsKeyPath = a.cfg.Sources.MediaServer.TLS.KeyPath
}
srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{
RTMPAddr: buildNetAddr(a.cfg.Sources.MediaServer.RTMP),
RTMPSAddr: buildNetAddr(a.cfg.Sources.MediaServer.RTMPS),
Host: a.cfg.Sources.MediaServer.Host,
TLSCertPath: tlsCertPath,
TLSKeyPath: tlsKeyPath,
StreamKey: mediaserver.StreamKey(a.cfg.Sources.MediaServer.StreamKey),
StreamKey: mediaserver.StreamKey(cfg.Sources.RTMP.StreamKey),
ContainerClient: containerClient,
Logger: a.logger.With("component", "mediaserver"),
Logger: logger.With("component", "mediaserver"),
})
if err != nil {
err = fmt.Errorf("create mediaserver: %w", err)
doFatalError(err.Error())
ui.ShowFatalErrorModal(err.Error())
emptyUI()
<-ui.C()
return err
}
defer srv.Close()
@ -168,7 +105,7 @@ func (a *App) Run(ctx context.Context) error {
repl := replicator.StartActor(ctx, replicator.StartActorParams{
SourceURL: srv.RTMPInternalURL(),
ContainerClient: containerClient,
Logger: a.logger.With("component", "replicator"),
Logger: logger.With("component", "replicator"),
})
defer repl.Close()
@ -176,55 +113,85 @@ func (a *App) Run(ctx context.Context) error {
uiUpdateT := time.NewTicker(uiUpdateInterval)
defer uiUpdateT.Stop()
startMediaServerC := make(chan struct{}, 1)
if a.headless { // disable startup check in headless mode for now
startMediaServerC <- struct{}{}
} else {
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
doFatalError(startupErr.Error())
return startupErr
} else if ok {
startMediaServerC <- struct{}{}
}
}
startupCheckC := doStartupCheck(ctx, containerClient, ui.ShowStartupCheckModal)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-startMediaServerC:
if err = srv.Start(ctx); err != nil {
return fmt.Errorf("start mediaserver: %w", err)
}
a.eventBus.Send(event.MediaServerStartedEvent{RTMPURL: srv.RTMPURL(), RTMPSURL: srv.RTMPSURL()})
case <-a.configService.C():
// No-op, config updates are handled synchronously for now.
case cmd := <-a.dispatchC:
if _, err := a.handleCommand(ctx, cmd, state, repl, containerClient, startMediaServerC); errors.Is(err, errExit) {
case err := <-startupCheckC:
if errors.Is(err, errStartupCheckUserQuit) {
return nil
} else if err != nil {
return fmt.Errorf("handle command: %w", err)
return fmt.Errorf("startup check: %w", err)
} else {
startupCheckC = nil
if err = srv.Start(ctx); err != nil {
return fmt.Errorf("start mediaserver: %w", err)
}
}
case <-params.ConfigService.C():
// No-op, config updates are handled synchronously for now.
case cmd, ok := <-ui.C():
if !ok {
// TODO: keep UI open until all containers have closed
logger.Info("UI closed")
return nil
}
logger.Debug("Command received", "cmd", cmd.Name())
switch c := cmd.(type) {
case terminal.CommandAddDestination:
newCfg := cfg
newCfg.Destinations = append(newCfg.Destinations, config.Destination{
Name: c.DestinationName,
URL: c.URL,
})
if err := params.ConfigService.SetConfig(newCfg); err != nil {
logger.Error("Config update failed", "err", err)
ui.ConfigUpdateFailed(err)
continue
}
cfg = newCfg
handleConfigUpdate(cfg, state, ui)
ui.DestinationAdded()
case terminal.CommandRemoveDestination:
repl.StopDestination(c.URL) // no-op if not live
newCfg := cfg
newCfg.Destinations = slices.DeleteFunc(newCfg.Destinations, func(dest config.Destination) bool {
return dest.URL == c.URL
})
if err := params.ConfigService.SetConfig(newCfg); err != nil {
logger.Error("Config update failed", "err", err)
ui.ConfigUpdateFailed(err)
continue
}
cfg = newCfg
handleConfigUpdate(cfg, state, ui)
ui.DestinationRemoved()
case terminal.CommandStartDestination:
if !state.Source.Live {
ui.ShowSourceNotLiveModal()
continue
}
repl.StartDestination(c.URL)
case terminal.CommandStopDestination:
repl.StopDestination(c.URL)
case terminal.CommandQuit:
return nil
}
case <-uiUpdateT.C:
updateUI()
case serverState := <-srv.C():
a.logger.Debug("Server state received", "state", serverState)
if serverState.ExitReason != "" {
doFatalError(serverState.ExitReason)
return errors.New("media server exited")
}
logger.Debug("Server state received", "state", serverState)
applyServerState(serverState, state)
updateUI()
case replState := <-repl.C():
a.logger.Debug("Replicator state received", "state", replState)
logger.Debug("Replicator state received", "state", replState)
destErrors := applyReplicatorState(replState, state)
for _, destError := range destErrors {
a.eventBus.Send(event.DestinationStreamExitedEvent{Name: destError.name, Err: destError.err})
repl.StopDestination(destError.url)
handleDestError(destError, repl, ui)
}
updateUI()
@ -232,100 +199,10 @@ func (a *App) Run(ctx context.Context) error {
}
}
type syncCommand struct {
event.Command
done chan<- event.Event
}
// Dispatch dispatches a command to be executed synchronously.
func (a *App) Dispatch(cmd event.Command) event.Event {
ch := make(chan event.Event, 1)
a.dispatchC <- syncCommand{Command: cmd, done: ch}
return <-ch
}
// errExit is an error that indicates the app should exit.
var errExit = errors.New("exit")
// handleCommand handles an incoming command. It may return an Event which will
// already have been published to the event bus, but which is returned for the
// benefit of synchronous callers. The event may be nil. It may also publish
// other events to the event bus which are not returned. Currently the only
// error that may be returned is [errExit], which indicates to the main event
// loop that the app should exit.
func (a *App) handleCommand(
ctx context.Context,
cmd event.Command,
state *domain.AppState,
repl *replicator.Actor,
containerClient *container.Client,
startMediaServerC chan struct{},
) (evt event.Event, _ error) {
a.logger.Debug("Command received", "cmd", cmd.Name())
defer func() {
if evt != nil {
a.eventBus.Send(evt)
}
if c, ok := cmd.(syncCommand); ok {
c.done <- evt
}
}()
switch c := cmd.(type) {
case event.CommandAddDestination:
newCfg := a.cfg
newCfg.Destinations = append(newCfg.Destinations, config.Destination{
Name: c.DestinationName,
URL: c.URL,
})
if err := a.configService.SetConfig(newCfg); err != nil {
a.logger.Error("Add destination failed", "err", err)
return event.AddDestinationFailedEvent{Err: err}, nil
}
a.cfg = newCfg
a.handleConfigUpdate(state)
a.eventBus.Send(event.DestinationAddedEvent{URL: c.URL})
case event.CommandRemoveDestination:
repl.StopDestination(c.URL) // no-op if not live
newCfg := a.cfg
newCfg.Destinations = slices.DeleteFunc(newCfg.Destinations, func(dest config.Destination) bool {
return dest.URL == c.URL
})
if err := a.configService.SetConfig(newCfg); err != nil {
a.logger.Error("Remove destination failed", "err", err)
a.eventBus.Send(event.RemoveDestinationFailedEvent{Err: err})
break
}
a.cfg = newCfg
a.handleConfigUpdate(state)
a.eventBus.Send(event.DestinationRemovedEvent{URL: c.URL}) //nolint:gosimple
case event.CommandStartDestination:
if !state.Source.Live {
a.eventBus.Send(event.StartDestinationFailedEvent{})
break
}
repl.StartDestination(c.URL)
case event.CommandStopDestination:
repl.StopDestination(c.URL)
case event.CommandCloseOtherInstance:
if err := closeOtherInstances(ctx, containerClient); err != nil {
return nil, fmt.Errorf("close other instances: %w", err)
}
startMediaServerC <- struct{}{}
case event.CommandQuit:
return nil, errExit
}
return nil, nil
}
// handleConfigUpdate applies the config to the app state, and sends an AppStateChangedEvent.
func (a *App) handleConfigUpdate(appState *domain.AppState) {
applyConfig(a.cfg, appState)
a.eventBus.Send(event.AppStateChangedEvent{State: appState.Clone()})
// handleConfigUpdate applies the config to the app state, and updates the UI.
func handleConfigUpdate(cfg config.Config, appState *domain.AppState, ui *terminal.UI) {
applyConfig(cfg, appState)
ui.SetState(*appState)
}
// applyServerState applies the current server state to the app state.
@ -371,6 +248,13 @@ func applyReplicatorState(replState replicator.State, appState *domain.AppState)
return errorsToDisplay
}
// handleDestError displays a modal to the user, and stops the destination.
func handleDestError(destError destinationError, repl *replicator.Actor, ui *terminal.UI) {
ui.ShowDestinationErrorModal(destError.name, destError.err)
repl.StopDestination(destError.url)
}
// applyConfig applies the config to the app state. For now we only set the
// destinations.
func applyConfig(cfg config.Config, appState *domain.AppState) {
@ -400,41 +284,38 @@ func resolveDestinations(destinations []domain.Destination, inDestinations []con
return destinations[:len(inDestinations)]
}
var errStartupCheckUserQuit = errors.New("user quit startup check modal")
// doStartupCheck performs a startup check to see if there are any existing app
// containers.
//
// It returns a bool if the check is clear. If the bool is false, then
// startup should be paused until the choice selected by the user is received
// via a command.
func doStartupCheck(ctx context.Context, containerClient *container.Client, eventBus *event.Bus) (bool, error) {
if exists, err := containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil {
return false, fmt.Errorf("check existing containers: %w", err)
} else if exists {
eventBus.Send(event.OtherInstanceDetectedEvent{})
// It returns a channel that will be closed, possibly after receiving an error.
// If the error is non-nil the app must not be started. If the error is
// [errStartupCheckUserQuit], the user voluntarily quit the startup check
// modal.
func doStartupCheck(ctx context.Context, containerClient *container.Client, showModal func() bool) <-chan error {
ch := make(chan error, 1)
return false, nil
}
go func() {
defer close(ch)
return true, nil
}
func closeOtherInstances(ctx context.Context, containerClient *container.Client) error {
if err := containerClient.RemoveContainers(ctx, container.AllContainers()); err != nil {
return fmt.Errorf("remove existing containers: %w", err)
}
if err := containerClient.RemoveUnusedNetworks(ctx); err != nil {
return fmt.Errorf("remove unused networks: %w", err)
}
return nil
}
// buildNetAddr builds a [mediaserver.OptionalNetAddr] from the config.
func buildNetAddr(src config.RTMPSource) mediaserver.OptionalNetAddr {
if !src.Enabled {
return mediaserver.OptionalNetAddr{Enabled: false}
}
return mediaserver.OptionalNetAddr{Enabled: true, NetAddr: domain.NetAddr(src.NetAddr)}
if exists, err := containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil {
ch <- fmt.Errorf("check existing containers: %w", err)
} else if exists {
if showModal() {
if err = containerClient.RemoveContainers(ctx, container.AllContainers()); err != nil {
ch <- fmt.Errorf("remove existing containers: %w", err)
return
}
if err = containerClient.RemoveUnusedNetworks(ctx); err != nil {
ch <- fmt.Errorf("remove unused networks: %w", err)
return
}
} else {
ch <- errStartupCheckUserQuit
}
}
}()
return ch
}

View File

@ -31,10 +31,10 @@ func buildAppParams(
screen tcell.SimulationScreen,
screenCaptureC chan<- terminal.ScreenCapture,
logger *slog.Logger,
) app.Params {
) app.RunParams {
t.Helper()
return app.Params{
return app.RunParams{
ConfigService: configService,
DockerClient: dockerClient,
Screen: &terminal.Screen{
@ -85,8 +85,6 @@ func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- termina
lines[y] += string(screenCells[n].Runes[0])
}
require.GreaterOrEqual(t, len(lines), 5, "Screen contents should have at least 5 lines")
return lines
}
@ -150,36 +148,25 @@ func printScreen(t *testing.T, getContents func() []string, label string) {
func sendKey(t *testing.T, screen tcell.SimulationScreen, key tcell.Key, ch rune) {
t.Helper()
const (
waitTime = 50 * time.Millisecond
maxTries = 50
)
for i := 0; i < maxTries; i++ {
if err := screen.PostEvent(tcell.NewEventKey(key, ch, tcell.ModNone)); err != nil {
time.Sleep(waitTime)
} else {
return
}
}
t.Fatalf("Failed to send key event after %d tries", maxTries)
screen.InjectKey(key, ch, tcell.ModNone)
time.Sleep(50 * time.Millisecond)
}
func sendKeys(t *testing.T, screen tcell.SimulationScreen, keys string) {
t.Helper()
for _, ch := range keys {
sendKey(t, screen, tcell.KeyRune, ch)
}
screen.InjectKeyBytes([]byte(keys))
time.Sleep(500 * time.Millisecond)
}
func sendBackspaces(t *testing.T, screen tcell.SimulationScreen, n int) {
t.Helper()
for range n {
sendKey(t, screen, tcell.KeyBackspace, 0)
screen.InjectKey(tcell.KeyBackspace, ' ', tcell.ModNone)
time.Sleep(50 * time.Millisecond)
}
time.Sleep(500 * time.Millisecond)
}
// kickFirstRTMPConn kicks the first RTMP connection from the mediaMTX server.

File diff suppressed because it is too large Load Diff

View File

@ -1,17 +0,0 @@
# openssl req -x509 -nodes -days 3650 -newkey rsa:2048 -keyout server.key -out server.crt -config openssl.cnf
[req]
default_bits = 2048
prompt = no
default_md = sha256
distinguished_name = dn
x509_extensions = v3_req
[dn]
CN = localhost
[v3_req]
subjectAltName = @alt_names
[alt_names]
DNS.1 = localhost

View File

@ -1,18 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIC7TCCAdWgAwIBAgIUTeqv46R19q+BS2e4DBkbIHuWyIIwDQYJKoZIhvcNAQEL
BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTI1MDQyMDA4NTMwN1oXDTM1MDQx
ODA4NTMwN1owFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEF
AAOCAQ8AMIIBCgKCAQEA0v/KndfKfG8XItStHeMQ/3z1r8vhkH9KGpfSwDMp8MdH
Mox6vcAsIIr1RFKmalQQg+T+TK9v3XM6F4sJ+WPyb5/31xLUqG6zivitrMy1AZ8w
XLgAz/CTufXL3OBntDwg29QXWt9lOUJyjRa66AQqreTlItuLG65bswfPA4g35f+U
hyr49paukqnVHRr44GtyiNxlfYCEdQWdOR0EQmZ7y6WNQQhnR8odQyftR2lykf17
MSJ8us4JAgZ2fr1QR+DfX5bCSS/WJ2aO7xxeES40NizBx08qYFami1zXrGMMo35I
SfedCohcok8ZZ1oWL+MfSJ2OLVclDnznDPTx39pZPQIDAQABozcwNTAUBgNVHREE
DTALgglsb2NhbGhvc3QwHQYDVR0OBBYEFCgZah+m2NXkI9biS2vnhNUrd3FiMA0G
CSqGSIb3DQEBCwUAA4IBAQAPbofZIKCm3DnudFnK+LRkdlpMNOyH2zn3g8h8vrfL
Tfi0oBgHb7EYxcHYDanZbcIKracWCfQVze2FRLgNFBWiyhDO4IXe/LpwSnbyLWCh
psbGuyVmEz9CuiyVdIi+CWQs5dBBRUCFg6NE2/r6Diw9LD0fVCVUwkvqopetfp1B
tvA74O0RduLWs+iXNs5XW4sODVkrOmhBbRrP9GRCVqiqVWJka6CzrNdBm0Y9zZMQ
GD/6fEgDaW8YlShoO+e4FwmD2IgIx+m4xamr/cQkWpbOHMxAwv7vP0stfkpyUacW
dh9eJmsDAmgGgdtMJvbIfyR9ilG8D6zwOmSlkF6fDJ3E
-----END CERTIFICATE-----

View File

@ -1,28 +0,0 @@
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDS/8qd18p8bxci
1K0d4xD/fPWvy+GQf0oal9LAMynwx0cyjHq9wCwgivVEUqZqVBCD5P5Mr2/dczoX
iwn5Y/Jvn/fXEtSobrOK+K2szLUBnzBcuADP8JO59cvc4Ge0PCDb1Bda32U5QnKN
FrroBCqt5OUi24sbrluzB88DiDfl/5SHKvj2lq6SqdUdGvjga3KI3GV9gIR1BZ05
HQRCZnvLpY1BCGdHyh1DJ+1HaXKR/XsxIny6zgkCBnZ+vVBH4N9flsJJL9YnZo7v
HF4RLjQ2LMHHTypgVqaLXNesYwyjfkhJ950KiFyiTxlnWhYv4x9InY4tVyUOfOcM
9PHf2lk9AgMBAAECggEAC3E3qaukHW9gz9C8upwvtcsu/6OMzes5N4v4L9gWdCo6
YDFiDpw3SGSAvH3G7Ik2hBCNAdeZt2aiRdiSZ+XVpdwE8rLguWmXbvfhYzeOsVHS
q5SG5r/jIviDX60DsrB4D7PGuHTY5mwGDkSnSiG/tsJs8qD5QD0KWAEaZtSiQ2Sp
kcRbdq13/2tjHyx7nBxEYUFC4EJQjK3cNNV4G7nG2xcfT46uPvFV0+1CQtMpFYhi
IsGaSBhW9gOAheycYxCi+LRdUh1IAnLUyYUenu0o8PoXsHp6KD8eS5RXtfA6THd/
Jr614gdAB2Sffw+bFf6FIBNWa5Jwsg9UtbGtjNdo+QKBgQDrOJ2nj7El6MIqeDHs
1cCeGDKmjB1CYWALLHrwwiwmrvEoeBMiJuMN4epZdQw9hwExa7fNpERI7Ay8s5HD
cdppxgcW7CWChNncbVZ39P+YI9URWC2Q2Y8FBhc9FA0sKpDak0rf5UE63SGjU8/I
FGgwjd1Ln5wws00OsYXBZw1lzwKBgQDlo2kRy6xvrUNAbeggT9OQeg2SdkWqvS3v
NUhBzZkVhJNf1oApNRoAvRMQt+Xt+Euw1pQ+TvdOZQhhqxs/pD/wGdM7rhq9r0+G
itsQ5LvNCxCePbSkbFMLgC8JgNuM3aRqhtsU+Illk9xvCj2nKsd+UUN3NxYgjCqa
evTKSzUfMwKBgFapy1w7EteWxEMFec96ibc1zyORqA4W9l3ni3w87itqdSul4dbJ
YQpyW/eNqm7Y2NWujE/V39rGLYMw3dmWjxQ9g8ssQj2uWN5f4mXb/He/a/cx98fQ
gGMndVRpmNjW7fu6HPIU802Ov5//dySOcDzDZ+8+5TsENLXfLhqtrz/9AoGBALc+
/BQoTFTdlSHv0mEecjwDOZtbZ+KEjggpo5xm/TbPkW7T03eOmU5nkrQvm3qXPYdC
5A8Ioo5bTyHpEZhqcF8frJEeMNaW88XwPjmv3TEVGFC9+s2OZ4Jw6pgRzKEPKSmc
rWyBm9qD8E5nhKVGHOVu4YBbY/va/hBB998Jvr1DAoGBAK5nnswLyQZi0lgpkl1P
ITkmvnQlZBfuqvoD7wcQ3nx/K/mdacsxepRne+U/4+iNzRtd3gU0iccCWUTJl4aB
cFRW1eXWuff+4vmM4JToDevGPXrS0CHE20mATJRZPH+YjZFl0pFSc4/tnjxBnx4y
vgM382WU9N9jIHCCnM6DYsbK
-----END PRIVATE KEY-----

View File

@ -22,37 +22,15 @@ func (l LogFile) GetPath() string {
return cmp.Or(l.Path, l.defaultPath)
}
// NetAddr holds an IP and/or port.
type NetAddr struct {
IP string `yaml:"ip,omitempty"`
Port int `yaml:"port,omitempty"`
}
// RTMPSource holds the configuration for the RTMP source.
type RTMPSource struct {
Enabled bool `yaml:"enabled"`
NetAddr `yaml:",inline"`
}
// TLS holds the TLS configuration.
type TLS struct {
CertPath string `yaml:"cert,omitempty"`
KeyPath string `yaml:"key,omitempty"`
}
// MediaServerSource holds the configuration for the media server source.
type MediaServerSource struct {
StreamKey string `yaml:"streamKey,omitempty"`
Host string `yaml:"host,omitempty"`
TLS *TLS `yaml:"tls,omitempty"`
RTMP RTMPSource `yaml:"rtmp"`
RTMPS RTMPSource `yaml:"rtmps"`
Enabled bool `yaml:"enabled"`
StreamKey string `yaml:"streamKey,omitempty"`
}
// Sources holds the configuration for the sources.
type Sources struct {
MediaServer MediaServerSource `yaml:"mediaServer"`
RTMP RTMPSource `yaml:"rtmp"`
}
// Config holds the configuration for the application.

View File

@ -180,13 +180,10 @@ func (s *Service) writeConfig(cfgBytes []byte) error {
// populateConfigOnBuild is called to set default values for a new, empty
// configuration.
//
// This function may set serialized fields to arbitrary values.
// This function may set exported fields to arbitrary values.
func (s *Service) populateConfigOnBuild(cfg *Config) {
cfg.Sources.MediaServer.StreamKey = "live"
cfg.Sources.MediaServer.RTMP = RTMPSource{
Enabled: true,
NetAddr: NetAddr{IP: "127.0.0.1", Port: 1935},
}
cfg.Sources.RTMP.Enabled = true
cfg.Sources.RTMP.StreamKey = "live"
s.populateConfigOnRead(cfg)
}
@ -194,7 +191,7 @@ func (s *Service) populateConfigOnBuild(cfg *Config) {
// populateConfigOnRead is called to set default values for a configuration
// read from an existing file.
//
// This function should not update any serialized values, which would be a
// This function should not update any exported values, which would be a
// confusing experience for the user.
func (s *Service) populateConfigOnRead(cfg *Config) {
cfg.LogFile.defaultPath = filepath.Join(s.appStateDir, "octoplex.log")

View File

@ -19,9 +19,6 @@ import (
//go:embed testdata/complete.yml
var configComplete []byte
//go:embed testdata/rtmps-only.yml
var configRTMPSOnly []byte
//go:embed testdata/logfile.yml
var configLogfile []byte
@ -47,9 +44,7 @@ func TestConfigServiceCurrent(t *testing.T) {
t.Cleanup(func() { require.NoError(t, os.RemoveAll(systemConfigDir)) })
// Ensure defaults are set:
assert.NotNil(t, service.Current().Sources.MediaServer.RTMP)
assert.Equal(t, "127.0.0.1", service.Current().Sources.MediaServer.RTMP.IP)
assert.Equal(t, 1935, service.Current().Sources.MediaServer.RTMP.Port)
assert.True(t, service.Current().Sources.RTMP.Enabled)
}
func TestConfigServiceCreateConfig(t *testing.T) {
@ -72,9 +67,7 @@ func TestConfigServiceCreateConfig(t *testing.T) {
var readCfg config.Config
require.NoError(t, yaml.Unmarshal(cfgBytes, &readCfg))
assert.NotNil(t, readCfg.Sources.MediaServer.RTMP)
assert.Equal(t, "127.0.0.1", readCfg.Sources.MediaServer.RTMP.IP)
assert.Equal(t, 1935, readCfg.Sources.MediaServer.RTMP.Port)
assert.True(t, readCfg.Sources.RTMP.Enabled, "default values not set")
}
func TestConfigServiceReadConfig(t *testing.T) {
@ -97,27 +90,9 @@ func TestConfigServiceReadConfig(t *testing.T) {
Path: "test.log",
},
Sources: config.Sources{
MediaServer: config.MediaServerSource{
RTMP: config.RTMPSource{
Enabled: true,
StreamKey: "s3cr3t",
Host: "rtmp.example.com",
TLS: &config.TLS{
CertPath: "/etc/cert.pem",
KeyPath: "/etc/key.pem",
},
RTMP: config.RTMPSource{
Enabled: true,
NetAddr: config.NetAddr{
IP: "0.0.0.0",
Port: 19350,
},
},
RTMPS: config.RTMPSource{
Enabled: true,
NetAddr: config.NetAddr{
IP: "0.0.0.0",
Port: 19443,
},
},
},
},
Destinations: []config.Destination{
@ -133,33 +108,6 @@ func TestConfigServiceReadConfig(t *testing.T) {
)
},
},
{
name: "RTMPS only",
configBytes: configRTMPSOnly,
want: func(t *testing.T, cfg config.Config) {
require.Empty(
t,
gocmp.Diff(
config.Config{
LogFile: config.LogFile{Enabled: true},
Sources: config.Sources{
MediaServer: config.MediaServerSource{
RTMPS: config.RTMPSource{
Enabled: true,
NetAddr: config.NetAddr{
IP: "0.0.0.0",
Port: 1935,
},
},
},
},
},
cfg,
cmpopts.IgnoreUnexported(config.LogFile{}),
),
)
},
},
{
name: "logging enabled, logfile",
configBytes: configLogfile,

View File

@ -3,20 +3,9 @@ logfile:
enabled: true
path: test.log
sources:
mediaServer:
rtmp:
enabled: true
streamKey: s3cr3t
host: rtmp.example.com
tls:
cert: /etc/cert.pem
key: /etc/key.pem
rtmp:
enabled: true
ip: 0.0.0.0
port: 19350
rtmps:
enabled: true
ip: 0.0.0.0
port: 19443
destinations:
- name: my stream
url: rtmp://rtmp.example.com:1935/live

View File

@ -1,9 +0,0 @@
---
logfile:
enabled: true
sources:
mediaServer:
rtmps:
enabled: true
ip: 0.0.0.0
port: 1935

View File

@ -34,7 +34,9 @@ type Source struct {
Container Container
Live bool
LiveChangedAt time.Time
Listeners int
Tracks []string
RTMPURL string
ExitReason string
}
@ -55,27 +57,6 @@ type Destination struct {
URL string
}
// NetAddr holds a network address.
type NetAddr struct {
IP string
Port int
}
// IsZero returns true if the NetAddr is zero value.
func (n NetAddr) IsZero() bool {
return n.IP == "" && n.Port == 0
}
// KeyPair holds a TLS key pair.
type KeyPair struct {
Cert, Key []byte
}
// IsZero returns true if the KeyPair is zero value.
func (k KeyPair) IsZero() bool {
return k.Cert == nil && k.Key == nil
}
// Container status strings.
//
// TODO: refactor to strictly reflect Docker status strings.

View File

@ -31,21 +31,3 @@ func TestAppStateClone(t *testing.T) {
s.Destinations[0].Name = "Twitch"
assert.Equal(t, "YouTube", s2.Destinations[0].Name)
}
func TestNetAddr(t *testing.T) {
var addr domain.NetAddr
assert.True(t, addr.IsZero())
addr.IP = "127.0.0.1"
addr.Port = 3000
assert.False(t, addr.IsZero())
}
func TestKeyPair(t *testing.T) {
var keyPair domain.KeyPair
assert.True(t, keyPair.IsZero())
keyPair.Cert = []byte("cert")
keyPair.Key = []byte("key")
assert.False(t, keyPair.IsZero())
}

View File

@ -1,49 +0,0 @@
package event
import (
"log/slog"
"sync"
)
const defaultChannelSize = 64
// Bus is an event bus.
type Bus struct {
consumers []chan Event
mu sync.Mutex
logger *slog.Logger
}
// NewBus returns a new event bus.
func NewBus(logger *slog.Logger) *Bus {
return &Bus{
logger: logger,
}
}
// Register registers a consumer for all events.
func (b *Bus) Register() <-chan Event {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan Event, defaultChannelSize)
b.consumers = append(b.consumers, ch)
return ch
}
// Send sends an event to all registered consumers.
func (b *Bus) Send(evt Event) {
// The mutex is needed to ensure the backing array of b.consumers cannot be
// modified under our feet. There is probably a more efficient way to do this
// but this should be ok.
b.mu.Lock()
defer b.mu.Unlock()
for _, ch := range b.consumers {
select {
case ch <- evt:
default:
b.logger.Warn("Event dropped", "name", evt.name())
}
}
}

View File

@ -1,29 +0,0 @@
package event_test
import (
"testing"
"git.netflux.io/rob/octoplex/internal/event"
"git.netflux.io/rob/octoplex/internal/testhelpers"
"github.com/stretchr/testify/assert"
)
func TestBus(t *testing.T) {
bus := event.NewBus(testhelpers.NewTestLogger(t))
ch1 := bus.Register()
ch2 := bus.Register()
evt := event.MediaServerStartedEvent{
RTMPURL: "rtmp://rtmp.example.com/live",
RTMPSURL: "rtmps://rtmp.example.com/live",
}
go func() {
bus.Send(evt)
bus.Send(evt)
}()
assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent))
assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent))
}

View File

@ -1,114 +0,0 @@
package event
import "git.netflux.io/rob/octoplex/internal/domain"
type Name string
const (
EventNameAppStateChanged Name = "app_state_changed"
EventNameDestinationAdded Name = "destination_added"
EventNameAddDestinationFailed Name = "add_destination_failed"
EventNameDestinationStreamExited Name = "destination_stream_exited"
EventNameStartDestinationFailed Name = "start_destination_failed"
EventNameDestinationRemoved Name = "destination_removed"
EventNameRemoveDestinationFailed Name = "remove_destination_failed"
EventNameFatalErrorOccurred Name = "fatal_error_occurred"
EventNameOtherInstanceDetected Name = "other_instance_detected"
EventNameMediaServerStarted Name = "media_server_started"
)
// Event represents something which happened in the appllication.
type Event interface {
name() Name
}
// AppStateChangedEvent is emitted when the application state changes.
type AppStateChangedEvent struct {
State domain.AppState
}
func (e AppStateChangedEvent) name() Name {
return EventNameAppStateChanged
}
// DestinationAddedEvent is emitted when a destination is successfully added.
type DestinationAddedEvent struct {
URL string
}
func (e DestinationAddedEvent) name() Name {
return EventNameDestinationAdded
}
// AddDestinationFailedEvent is emitted when a destination fails to be added.
type AddDestinationFailedEvent struct {
Err error
}
func (e AddDestinationFailedEvent) name() Name {
return EventNameAddDestinationFailed
}
// DestinationStreamExitedEvent is emitted when a destination goes off-air unexpectedly.
type DestinationStreamExitedEvent struct {
Name string
Err error
}
func (e DestinationStreamExitedEvent) name() Name {
return EventNameDestinationStreamExited
}
// StartDestinationFailedEvent is emitted when a destination fails to start.
type StartDestinationFailedEvent struct{}
func (e StartDestinationFailedEvent) name() Name {
return EventNameStartDestinationFailed
}
// DestinationRemovedEvent is emitted when a destination is successfully
// removed.
type DestinationRemovedEvent struct {
URL string
}
func (e DestinationRemovedEvent) name() Name {
return EventNameDestinationRemoved
}
// RemoveDestinationFailedEvent is emitted when a destination fails to be
// removed.
type RemoveDestinationFailedEvent struct {
Err error
}
func (e RemoveDestinationFailedEvent) name() Name {
return EventNameRemoveDestinationFailed
}
// FatalErrorOccurredEvent is emitted when a fatal application
// error occurs.
type FatalErrorOccurredEvent struct {
Message string
}
// OtherInstanceDetectedEvent is emitted when the app launches and detects another instance.
type OtherInstanceDetectedEvent struct{}
func (e OtherInstanceDetectedEvent) name() Name {
return EventNameOtherInstanceDetected
}
func (e FatalErrorOccurredEvent) name() Name {
return "fatal_error_occurred"
}
// MediaServerStartedEvent is emitted when the mediaserver component starts successfully.
type MediaServerStartedEvent struct {
RTMPURL string
RTMPSURL string
}
func (e MediaServerStartedEvent) name() Name {
return "media_server_started"
}

View File

@ -8,7 +8,7 @@ import (
"fmt"
"log/slog"
"net/http"
"os"
"strconv"
"time"
typescontainer "github.com/docker/docker/api/types/container"
@ -27,22 +27,14 @@ import (
type StreamKey string
const (
defaultUpdateStateInterval = 5 * time.Second // default interval to update the state of the media server
defaultAPIPort = 9997 // default API host port for the media server
defaultRTMPIP = "127.0.0.1" // default RTMP host IP, bound to localhost for security
defaultRTMPPort = 1935 // default RTMP host port for the media server
defaultRTMPSPort = 1936 // default RTMPS host port for the media server
defaultHost = "localhost" // default mediaserver host name
defaultChanSize = 64 // default channel size for asynchronous non-error channels
imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx
defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey].
componentName = "mediaserver" // component name, mostly used for Docker labels
httpClientTimeout = time.Second // timeout for outgoing HTTP client requests
configPath = "/mediamtx.yml" // path to the media server config file
tlsInternalCertPath = "/etc/tls-internal.crt" // path to the internal TLS cert
tlsInternalKeyPath = "/etc/tls-internal.key" // path to the internal TLS key
tlsCertPath = "/etc/tls.crt" // path to the custom TLS cert
tlsKeyPath = "/etc/tls.key" // path to the custom TLS key
defaultFetchIngressStateInterval = 5 * time.Second // default interval to fetch the state of the media server
defaultAPIPort = 9997 // default API host port for the media server
defaultRTMPPort = 1935 // default RTMP host port for the media server
defaultChanSize = 64 // default channel size for asynchronous non-error channels
imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx
defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey].
componentName = "mediaserver" // component name, mostly used for Docker labels
httpClientTimeout = time.Second // timeout for outgoing HTTP client requests
)
// action is an action to be performed by the actor.
@ -50,21 +42,18 @@ type action func()
// Actor is responsible for managing the media server.
type Actor struct {
actorC chan action
stateC chan domain.Source
chanSize int
containerClient *container.Client
rtmpAddr domain.NetAddr
rtmpsAddr domain.NetAddr
apiPort int
host string
streamKey StreamKey
updateStateInterval time.Duration
pass string // password for the media server
keyPairInternal domain.KeyPair // TLS key pair for the media server
keyPairCustom domain.KeyPair // TLS key pair for the media server
logger *slog.Logger
apiClient *http.Client
actorC chan action
stateC chan domain.Source
chanSize int
containerClient *container.Client
apiPort int
rtmpPort int
streamKey StreamKey
fetchIngressStateInterval time.Duration
pass string // password for the media server
tlsCert, tlsKey []byte // TLS cert and key for the media server
logger *slog.Logger
apiClient *http.Client
// mutable state
state *domain.Source
@ -73,215 +62,61 @@ type Actor struct {
// NewActorParams contains the parameters for building a new media server
// actor.
type NewActorParams struct {
RTMPAddr OptionalNetAddr // defaults to disabled, or 127.0.0.1:1935
RTMPSAddr OptionalNetAddr // defaults to disabled, or 127.0.0.1:1936
APIPort int // defaults to 9997
Host string // defaults to "localhost"
TLSCertPath string // defaults to empty
TLSKeyPath string // defaults to empty
StreamKey StreamKey // defaults to "live"
ChanSize int // defaults to 64
UpdateStateInterval time.Duration // defaults to 5 seconds
ContainerClient *container.Client
Logger *slog.Logger
}
// OptionalNetAddr is a wrapper around domain.NetAddr that indicates whether it
// is enabled or not.
type OptionalNetAddr struct {
domain.NetAddr
Enabled bool
APIPort int // defaults to 9997
RTMPPort int // defaults to 1935
StreamKey StreamKey // defaults to "live"
ChanSize int // defaults to 64
FetchIngressStateInterval time.Duration // defaults to 5 seconds
ContainerClient *container.Client
Logger *slog.Logger
}
// NewActor creates a new media server actor.
//
// Callers must consume the state channel exposed via [C].
func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error) {
dnsNames := []string{"localhost"}
if params.Host != "" {
dnsNames = append(dnsNames, params.Host)
}
keyPairInternal, err := generateTLSCert(dnsNames...)
tlsCert, tlsKey, err := generateTLSCert()
if err != nil {
return nil, fmt.Errorf("generate TLS cert: %w", err)
}
var keyPairCustom domain.KeyPair
if params.TLSCertPath != "" {
keyPairCustom.Cert, err = os.ReadFile(params.TLSCertPath)
if err != nil {
return nil, fmt.Errorf("read TLS cert: %w", err)
}
keyPairCustom.Key, err = os.ReadFile(params.TLSKeyPath)
if err != nil {
return nil, fmt.Errorf("read TLS key: %w", err)
}
}
// TODO: custom cert for API?
apiClient, err := buildAPIClient(keyPairInternal.Cert)
apiClient, err := buildAPIClient(tlsCert)
if err != nil {
return nil, fmt.Errorf("build API client: %w", err)
}
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
return &Actor{
rtmpAddr: toRTMPAddr(params.RTMPAddr, defaultRTMPPort),
rtmpsAddr: toRTMPAddr(params.RTMPSAddr, defaultRTMPSPort),
apiPort: cmp.Or(params.APIPort, defaultAPIPort),
host: cmp.Or(params.Host, defaultHost),
streamKey: cmp.Or(params.StreamKey, defaultStreamKey),
updateStateInterval: cmp.Or(params.UpdateStateInterval, defaultUpdateStateInterval),
keyPairInternal: keyPairInternal,
keyPairCustom: keyPairCustom,
pass: generatePassword(),
actorC: make(chan action, chanSize),
state: new(domain.Source),
stateC: make(chan domain.Source, chanSize),
chanSize: chanSize,
containerClient: params.ContainerClient,
logger: params.Logger,
apiClient: apiClient,
apiPort: cmp.Or(params.APIPort, defaultAPIPort),
rtmpPort: cmp.Or(params.RTMPPort, defaultRTMPPort),
streamKey: cmp.Or(params.StreamKey, defaultStreamKey),
fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval),
tlsCert: tlsCert,
tlsKey: tlsKey,
pass: generatePassword(),
actorC: make(chan action, chanSize),
state: new(domain.Source),
stateC: make(chan domain.Source, chanSize),
chanSize: chanSize,
containerClient: params.ContainerClient,
logger: params.Logger,
apiClient: apiClient,
}, nil
}
func (a *Actor) Start(ctx context.Context) error {
var portSpecs []string
portSpecs = append(portSpecs, fmt.Sprintf("127.0.0.1:%d:9997", a.apiPort))
if !a.rtmpAddr.IsZero() {
portSpecs = append(portSpecs, fmt.Sprintf("%s:%d:%d", a.rtmpAddr.IP, a.rtmpAddr.Port, 1935))
}
if !a.rtmpsAddr.IsZero() {
portSpecs = append(portSpecs, fmt.Sprintf("%s:%d:%d", a.rtmpsAddr.IP, a.rtmpsAddr.Port, 1936))
}
exposedPorts, portBindings, err := nat.ParsePortSpecs(portSpecs)
if err != nil {
return fmt.Errorf("parse port specs: %w", err)
}
// Exposed ports are bound to 127.0.0.1 for security.
// TODO: configurable RTMP bind address
apiPortSpec := nat.Port("127.0.0.1:" + strconv.Itoa(a.apiPort) + ":9997")
rtmpPortSpec := nat.Port("127.0.0.1:" + strconv.Itoa(+a.rtmpPort) + ":1935")
exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)})
cfg, err := a.buildServerConfig()
if err != nil {
return fmt.Errorf("build server config: %w", err)
}
// The RTMP URL is passed to the UI via the state.
// This could be refactored, it's not really stateful data.
a.state.RTMPURL = a.RTMPURL()
copyFiles := []container.CopyFileConfig{
{
Path: configPath,
Payload: bytes.NewReader(cfg),
Mode: 0600,
},
{
Path: tlsInternalCertPath,
Payload: bytes.NewReader(a.keyPairInternal.Cert),
Mode: 0600,
},
{
Path: tlsInternalKeyPath,
Payload: bytes.NewReader(a.keyPairInternal.Key),
Mode: 0600,
},
{
Path: "/etc/healthcheckopts.txt",
Payload: bytes.NewReader([]byte(fmt.Sprintf("--user api:%s", a.pass))),
Mode: 0600,
},
}
if !a.keyPairCustom.IsZero() {
copyFiles = append(
copyFiles,
container.CopyFileConfig{
Path: tlsCertPath,
Payload: bytes.NewReader(a.keyPairCustom.Cert),
Mode: 0600,
},
container.CopyFileConfig{
Path: tlsKeyPath,
Payload: bytes.NewReader(a.keyPairCustom.Key),
Mode: 0600,
},
)
}
args := []any{"host", a.host}
if a.rtmpAddr.IsZero() {
args = append(args, "rtmp.enabled", false)
} else {
args = append(args, "rtmp.enabled", true, "rtmp.bind_addr", a.rtmpAddr.IP, "rtmp.bind_port", a.rtmpAddr.Port)
}
if a.rtmpsAddr.IsZero() {
args = append(args, "rtmps.enabled", false)
} else {
args = append(args, "rtmps.enabled", true, "rtmps.bind_addr", a.rtmpsAddr.IP, "rtmps.bind_port", a.rtmpsAddr.Port)
}
a.logger.Info("Starting media server", args...)
containerStateC, errC := a.containerClient.RunContainer(
ctx,
container.RunContainerParams{
Name: componentName,
ChanSize: a.chanSize,
ContainerConfig: &typescontainer.Config{
Image: imageNameMediaMTX,
Hostname: "mediaserver",
Labels: map[string]string{container.LabelComponent: componentName},
Healthcheck: &typescontainer.HealthConfig{
Test: []string{
"CMD",
"curl",
"--fail",
"--silent",
"--cacert", "/etc/tls-internal.crt",
"--config", "/etc/healthcheckopts.txt",
a.healthCheckURL(),
},
Interval: time.Second * 10,
StartPeriod: time.Second * 2,
StartInterval: time.Second * 2,
Retries: 2,
},
ExposedPorts: exposedPorts,
},
HostConfig: &typescontainer.HostConfig{
NetworkMode: "default",
PortBindings: portBindings,
},
NetworkCountConfig: container.NetworkCountConfig{Rx: "eth0", Tx: "eth1"},
Logs: container.LogConfig{Stdout: true},
CopyFiles: copyFiles,
},
)
go a.actorLoop(ctx, containerStateC, errC)
return nil
}
func (a *Actor) buildServerConfig() ([]byte, error) {
// NOTE: Regardless of the user configuration (which mostly affects exposed
// ports and UI rendering) plain RTMP must be enabled at the container level,
// for internal connections.
var encryptionString string
if a.rtmpsAddr.IsZero() {
encryptionString = "no"
} else {
encryptionString = "optional"
}
var certPath, keyPath string
if a.keyPairCustom.IsZero() {
certPath = tlsInternalCertPath
keyPath = tlsInternalKeyPath
} else {
certPath = tlsCertPath
keyPath = tlsKeyPath
}
return yaml.Marshal(
cfg, err := yaml.Marshal(
Config{
LogLevel: "debug",
LogLevel: "info",
LogDestinations: []string{"stdout"},
AuthMethod: "internal",
AuthInternalUsers: []User{
@ -307,21 +142,79 @@ func (a *Actor) buildServerConfig() ([]byte, error) {
Permissions: []UserPermission{{Action: "api"}},
},
},
RTMP: true,
RTMPEncryption: encryptionString,
RTMPAddress: ":1935",
RTMPSAddress: ":1936",
RTMPServerCert: certPath,
RTMPServerKey: keyPath,
API: true,
APIEncryption: true,
APIServerCert: tlsInternalCertPath,
APIServerKey: tlsInternalKeyPath,
API: true,
APIEncryption: true,
APIServerCert: "/etc/tls.crt",
APIServerKey: "/etc/tls.key",
Paths: map[string]Path{
string(a.streamKey): {Source: "publisher"},
},
},
)
if err != nil { // should never happen
return fmt.Errorf("marshal config: %w", err)
}
containerStateC, errC := a.containerClient.RunContainer(
ctx,
container.RunContainerParams{
Name: componentName,
ChanSize: a.chanSize,
ContainerConfig: &typescontainer.Config{
Image: imageNameMediaMTX,
Hostname: "mediaserver",
Labels: map[string]string{container.LabelComponent: componentName},
Healthcheck: &typescontainer.HealthConfig{
Test: []string{
"CMD",
"curl",
"--fail",
"--silent",
"--cacert", "/etc/tls.crt",
"--config", "/etc/healthcheckopts.txt",
a.healthCheckURL(),
},
Interval: time.Second * 10,
StartPeriod: time.Second * 2,
StartInterval: time.Second * 2,
Retries: 2,
},
ExposedPorts: exposedPorts,
},
HostConfig: &typescontainer.HostConfig{
NetworkMode: "default",
PortBindings: portBindings,
},
NetworkCountConfig: container.NetworkCountConfig{Rx: "eth0", Tx: "eth1"},
Logs: container.LogConfig{Stdout: true},
CopyFiles: []container.CopyFileConfig{
{
Path: "/mediamtx.yml",
Payload: bytes.NewReader(cfg),
Mode: 0600,
},
{
Path: "/etc/tls.crt",
Payload: bytes.NewReader(a.tlsCert),
Mode: 0600,
},
{
Path: "/etc/tls.key",
Payload: bytes.NewReader(a.tlsKey),
Mode: 0600,
},
{
Path: "/etc/healthcheckopts.txt",
Payload: bytes.NewReader([]byte(fmt.Sprintf("--user api:%s", a.pass))),
Mode: 0600,
},
},
},
)
go a.actorLoop(ctx, containerStateC, errC)
return nil
}
// C returns a channel that will receive the current state of the media server.
@ -355,8 +248,16 @@ func (s *Actor) Close() error {
// actorLoop is the main loop of the media server actor. It exits when the
// actor is closed, or the parent context is cancelled.
func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Container, errC <-chan error) {
updateStateT := time.NewTicker(s.updateStateInterval)
defer updateStateT.Stop()
fetchStateT := time.NewTicker(s.fetchIngressStateInterval)
defer fetchStateT.Stop()
// fetchTracksT is used to signal that tracks should be fetched from the
// media server, after the stream goes on-air. A short delay is needed due to
// workaround a race condition in the media server.
var fetchTracksT *time.Timer
resetFetchTracksT := func(d time.Duration) { fetchTracksT = time.NewTimer(d) }
resetFetchTracksT(time.Second)
fetchTracksT.Stop()
sendState := func() { s.stateC <- *s.state }
@ -366,7 +267,7 @@ func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Con
s.state.Container = containerState
if s.state.Container.Status == domain.ContainerStatusExited {
updateStateT.Stop()
fetchStateT.Stop()
s.handleContainerExit(nil)
}
@ -385,21 +286,43 @@ func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Con
s.logger.Error("Error from container client", "err", err, "id", shortID(s.state.Container.ID))
}
updateStateT.Stop()
fetchStateT.Stop()
s.handleContainerExit(err)
sendState()
case <-updateStateT.C:
path, err := fetchPath(s.pathURL(string(s.streamKey)), s.apiClient)
case <-fetchStateT.C:
ingressState, err := fetchIngressState(s.rtmpConnsURL(), s.streamKey, s.apiClient)
if err != nil {
s.logger.Error("Error fetching path", "err", err)
s.logger.Error("Error fetching server state", "err", err)
continue
}
if path.Ready != s.state.Live {
s.state.Live = path.Ready
var shouldSendState bool
if ingressState.ready != s.state.Live {
s.state.Live = ingressState.ready
s.state.LiveChangedAt = time.Now()
s.state.Tracks = path.Tracks
resetFetchTracksT(time.Second)
shouldSendState = true
}
if ingressState.listeners != s.state.Listeners {
s.state.Listeners = ingressState.listeners
shouldSendState = true
}
if shouldSendState {
sendState()
}
case <-fetchTracksT.C:
if !s.state.Live {
continue
}
if tracks, err := fetchTracks(s.pathsURL(), s.streamKey, s.apiClient); err != nil {
s.logger.Error("Error fetching tracks", "err", err)
resetFetchTracksT(3 * time.Second)
} else if len(tracks) == 0 {
resetFetchTracksT(time.Second)
} else {
s.state.Tracks = tracks
sendState()
}
case action, ok := <-s.actorC:
@ -429,20 +352,7 @@ func (s *Actor) handleContainerExit(err error) {
// RTMPURL returns the RTMP URL for the media server, accessible from the host.
func (s *Actor) RTMPURL() string {
if s.rtmpAddr.IsZero() {
return ""
}
return fmt.Sprintf("rtmp://%s:%d/%s", s.host, s.rtmpAddr.Port, s.streamKey)
}
// RTMPSURL returns the RTMPS URL for the media server, accessible from the host.
func (s *Actor) RTMPSURL() string {
if s.rtmpsAddr.IsZero() {
return ""
}
return fmt.Sprintf("rtmps://%s:%d/%s", s.host, s.rtmpsAddr.Port, s.streamKey)
return fmt.Sprintf("rtmp://localhost:%d/%s", s.rtmpPort, s.streamKey)
}
// RTMPInternalURL returns the RTMP URL for the media server, accessible from
@ -452,9 +362,15 @@ func (s *Actor) RTMPInternalURL() string {
return fmt.Sprintf("rtmp://mediaserver:1935/%s?user=api&pass=%s", s.streamKey, s.pass)
}
// pathURL returns the URL for fetching a path, accessible from the host.
func (s *Actor) pathURL(path string) string {
return fmt.Sprintf("https://api:%s@localhost:%d/v3/paths/get/%s", s.pass, s.apiPort, path)
// rtmpConnsURL returns the URL for fetching RTMP connections, accessible from
// the host.
func (s *Actor) rtmpConnsURL() string {
return fmt.Sprintf("https://api:%s@localhost:%d/v3/rtmpconns/list", s.pass, s.apiPort)
}
// pathsURL returns the URL for fetching paths, accessible from the host.
func (s *Actor) pathsURL() string {
return fmt.Sprintf("https://api:%s@localhost:%d/v3/paths/list", s.pass, s.apiPort)
}
// healthCheckURL returns the URL for the health check, accessible from the
@ -480,17 +396,3 @@ func generatePassword() string {
_, _ = rand.Read(p)
return fmt.Sprintf("%x", []byte(p))
}
// toRTMPAddr builds a domain.NetAddr from an OptionalNetAddr, with default
// values set to RTMP default bind config if needed. If the OptionalNetAddr is
// not enabled, a zero value is returned.
func toRTMPAddr(a OptionalNetAddr, defaultPort int) domain.NetAddr {
if !a.Enabled {
return domain.NetAddr{}
}
return domain.NetAddr{
IP: cmp.Or(a.IP, defaultRTMPIP),
Port: cmp.Or(a.Port, defaultPort),
}
}

View File

@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"time"
)
type httpClient interface {
@ -43,37 +44,109 @@ func buildAPIClient(certPEM []byte) (*http.Client, error) {
const userAgent = "octoplex-client"
type apiPath struct {
Name string `json:"name"`
Ready bool `json:"ready"`
Tracks []string `json:"tracks"`
type apiResponse[T any] struct {
Items []T `json:"items"`
}
func fetchPath(apiURL string, httpClient httpClient) (apiPath, error) {
type rtmpConnsResponse struct {
ID string `json:"id"`
CreatedAt time.Time `json:"created"`
State string `json:"state"`
Path string `json:"path"`
BytesReceived int64 `json:"bytesReceived"`
BytesSent int64 `json:"bytesSent"`
RemoteAddr string `json:"remoteAddr"`
}
type ingressStreamState struct {
ready bool
listeners int
}
// TODO: handle pagination
func fetchIngressState(apiURL string, streamKey StreamKey, httpClient httpClient) (state ingressStreamState, _ error) {
req, err := http.NewRequest(http.MethodGet, apiURL, nil)
if err != nil {
return apiPath{}, fmt.Errorf("new request: %w", err)
return state, fmt.Errorf("new request: %w", err)
}
req.Header.Set("User-Agent", userAgent)
httpResp, err := httpClient.Do(req)
if err != nil {
return apiPath{}, fmt.Errorf("do request: %w", err)
return state, fmt.Errorf("do request: %w", err)
}
if httpResp.StatusCode != http.StatusOK {
return apiPath{}, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode)
return state, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode)
}
respBody, err := io.ReadAll(httpResp.Body)
if err != nil {
return apiPath{}, fmt.Errorf("read body: %w", err)
return state, fmt.Errorf("read body: %w", err)
}
var path apiPath
if err = json.Unmarshal(respBody, &path); err != nil {
return apiPath{}, fmt.Errorf("unmarshal: %w", err)
var resp apiResponse[rtmpConnsResponse]
if err = json.Unmarshal(respBody, &resp); err != nil {
return state, fmt.Errorf("unmarshal: %w", err)
}
return path, nil
for _, conn := range resp.Items {
if conn.Path != string(streamKey) {
continue
}
switch conn.State {
case "publish":
// mediamtx may report a stream as being in publish state via the API,
// but still refuse to serve them due to being unpublished. This seems to
// be a bug, this is a hacky workaround.
state.ready = conn.BytesReceived > 20_000
case "read":
state.listeners++
}
}
return state, nil
}
type path struct {
Name string `json:"name"`
Tracks []string `json:"tracks"`
}
// TODO: handle pagination
func fetchTracks(apiURL string, streamKey StreamKey, httpClient httpClient) ([]string, error) {
req, err := http.NewRequest(http.MethodGet, apiURL, nil)
if err != nil {
return nil, fmt.Errorf("new request: %w", err)
}
req.Header.Set("User-Agent", userAgent)
httpResp, err := httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("do request: %w", err)
}
if httpResp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode)
}
respBody, err := io.ReadAll(httpResp.Body)
if err != nil {
return nil, fmt.Errorf("read body: %w", err)
}
var resp apiResponse[path]
if err = json.Unmarshal(respBody, &resp); err != nil {
return nil, fmt.Errorf("unmarshal: %w", err)
}
var tracks []string
for _, path := range resp.Items {
if path.Name == string(streamKey) {
tracks = path.Tracks
}
}
return tracks, nil
}

View File

@ -12,14 +12,14 @@ import (
"github.com/stretchr/testify/require"
)
func TestFetchPath(t *testing.T) {
const url = "http://localhost:8989/v3/paths/get/live"
func TestFetchIngressState(t *testing.T) {
const url = "http://localhost:8989/v3/rtmpconns/list"
testCases := []struct {
name string
httpResponse *http.Response
httpError error
wantPath apiPath
wantState ingressStreamState
wantErr error
}{
{
@ -36,20 +36,36 @@ func TestFetchPath(t *testing.T) {
wantErr: errors.New("unmarshal: invalid character 'i' looking for beginning of value"),
},
{
name: "successful response, not ready",
name: "successful response, no streams",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"name":"live","confName":"live","source":null,"ready":false,"readyTime":null,"tracks":[],"bytesReceived":0,"bytesSent":0,"readers":[]}`))),
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":0,"pageCount":0,"items":[]}`))),
},
wantPath: apiPath{Name: "live", Ready: false, Tracks: []string{}},
wantState: ingressStreamState{ready: false, listeners: 0},
},
{
name: "successful response, not yet ready",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"id":"d2953cf8-9cd6-4c30-816f-807b80b6a71f","created":"2025-02-15T08:19:00.616220354Z","remoteAddr":"172.17.0.1:32972","state":"publish","path":"live","query":"","bytesReceived":15462,"bytesSent":3467}]}`))),
},
wantState: ingressStreamState{ready: false, listeners: 0},
},
{
name: "successful response, ready",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"name":"live","confName":"live","source":{"type":"rtmpConn","id":"fd2d79a8-bab9-4141-a1b5-55bd1a8649df"},"ready":true,"readyTime":"2025-04-18T07:44:53.683627506Z","tracks":["H264"],"bytesReceived":254677,"bytesSent":0,"readers":[]}`))),
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"id":"d2953cf8-9cd6-4c30-816f-807b80b6a71f","created":"2025-02-15T08:19:00.616220354Z","remoteAddr":"172.17.0.1:32972","state":"publish","path":"live","query":"","bytesReceived":27832,"bytesSent":3467}]}`))),
},
wantPath: apiPath{Name: "live", Ready: true, Tracks: []string{"H264"}},
wantState: ingressStreamState{ready: true, listeners: 0},
},
{
name: "successful response, ready, with listeners",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":2,"pageCount":1,"items":[{"id":"12668315-0572-41f1-8384-fe7047cc73be","created":"2025-02-15T08:23:43.836589664Z","remoteAddr":"172.17.0.1:40026","state":"publish","path":"live","query":"","bytesReceived":7180753,"bytesSent":3467},{"id":"079370fd-43bb-4798-b079-860cc3159e4e","created":"2025-02-15T08:24:32.396794364Z","remoteAddr":"192.168.48.3:44736","state":"read","path":"live","query":"","bytesReceived":333435,"bytesSent":24243}]}`))),
},
wantState: ingressStreamState{ready: true, listeners: 1},
},
}
@ -63,12 +79,74 @@ func TestFetchPath(t *testing.T) {
})).
Return(tc.httpResponse, tc.httpError)
path, err := fetchPath(url, &httpClient)
state, err := fetchIngressState(url, StreamKey("live"), &httpClient)
if tc.wantErr != nil {
require.EqualError(t, err, tc.wantErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, tc.wantPath, path)
require.Equal(t, tc.wantState, state)
}
})
}
}
func TestFetchTracks(t *testing.T) {
const url = "http://localhost:8989/v3/paths/list"
testCases := []struct {
name string
httpResponse *http.Response
httpError error
wantTracks []string
wantErr error
}{
{
name: "non-200 status",
httpResponse: &http.Response{StatusCode: http.StatusNotFound},
wantErr: errors.New("unexpected status code: 404"),
},
{
name: "unparseable response",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte("invalid json"))),
},
wantErr: errors.New("unmarshal: invalid character 'i' looking for beginning of value"),
},
{
name: "successful response, no tracks",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"name":"live","confName":"all_others","source":{"type":"rtmpConn","id":"287340b2-04c2-4fcc-ab9c-089f4ff15aeb"},"ready":true,"readyTime":"2025-02-22T17:26:05.527206818Z","tracks":[],"bytesReceived":94430983,"bytesSent":0,"readers":[]}]}`))),
},
wantTracks: []string{},
},
{
name: "successful response, tracks",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"name":"live","confName":"all_others","source":{"type":"rtmpConn","id":"287340b2-04c2-4fcc-ab9c-089f4ff15aeb"},"ready":true,"readyTime":"2025-02-22T17:26:05.527206818Z","tracks":["H264","MPEG-4 Audio"],"bytesReceived":94430983,"bytesSent":0,"readers":[]}]}`))),
},
wantTracks: []string{"H264", "MPEG-4 Audio"},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var httpClient mocks.HTTPClient
httpClient.
EXPECT().
Do(mock.MatchedBy(func(req *http.Request) bool {
return req.URL.String() == url && req.Method == http.MethodGet
})).
Return(tc.httpResponse, tc.httpError)
tracks, err := fetchTracks(url, StreamKey("live"), &httpClient)
if tc.wantErr != nil {
require.EqualError(t, err, tc.wantErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, tc.wantTracks, tracks)
}
})
}

View File

@ -17,12 +17,8 @@ type Config struct {
APIEncryption bool `yaml:"apiEncryption,omitempty"`
APIServerCert string `yaml:"apiServerCert,omitempty"`
APIServerKey string `yaml:"apiServerKey,omitempty"`
RTMP bool `yaml:"rtmp"`
RTMPEncryption string `yaml:"rtmpEncryption,omitempty"`
RTMP bool `yaml:"rtmp,omitempty"`
RTMPAddress string `yaml:"rtmpAddress,omitempty"`
RTMPSAddress string `yaml:"rtmpsAddress,omitempty"`
RTMPServerCert string `yaml:"rtmpServerCert,omitempty"`
RTMPServerKey string `yaml:"rtmpServerKey,omitempty"`
HLS bool `yaml:"hls"`
RTSP bool `yaml:"rtsp"`
WebRTC bool `yaml:"webrtc"`

View File

@ -10,20 +10,23 @@ import (
"encoding/pem"
"math/big"
"time"
)
"git.netflux.io/rob/octoplex/internal/domain"
type (
tlsCert []byte
tlsKey []byte
)
// generateTLSCert generates a self-signed TLS certificate and private key.
func generateTLSCert(dnsNames ...string) (domain.KeyPair, error) {
func generateTLSCert() (tlsCert, tlsKey, error) {
privKey, err := ecdsa.GenerateKey(elliptic.P384(), rand.Reader)
if err != nil {
return domain.KeyPair{}, err
return nil, nil, err
}
serialNumber, err := rand.Int(rand.Reader, new(big.Int).Lsh(big.NewInt(1), 128))
if err != nil {
return domain.KeyPair{}, err
return nil, nil, err
}
now := time.Now()
@ -37,31 +40,28 @@ func generateTLSCert(dnsNames ...string) (domain.KeyPair, error) {
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
BasicConstraintsValid: true,
DNSNames: dnsNames,
DNSNames: []string{"localhost"},
}
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &privKey.PublicKey, privKey)
if err != nil {
return domain.KeyPair{}, err
return nil, nil, err
}
var certPEM, keyPEM bytes.Buffer
if err = pem.Encode(&certPEM, &pem.Block{Type: "CERTIFICATE", Bytes: certDER}); err != nil {
return domain.KeyPair{}, err
return nil, nil, err
}
privKeyDER, err := x509.MarshalECPrivateKey(privKey)
if err != nil {
return domain.KeyPair{}, err
return nil, nil, err
}
if err := pem.Encode(&keyPEM, &pem.Block{Type: "EC PRIVATE KEY", Bytes: privKeyDER}); err != nil {
return domain.KeyPair{}, err
return nil, nil, err
}
return domain.KeyPair{
Cert: certPEM.Bytes(),
Key: keyPEM.Bytes(),
}, nil
return certPEM.Bytes(), keyPEM.Bytes(), nil
}

View File

@ -12,12 +12,12 @@ import (
)
func TestGenerateTLSCert(t *testing.T) {
keyPair, err := generateTLSCert("localhost", "rtmp.example.com")
certPEM, keyPEM, err := generateTLSCert()
require.NoError(t, err)
require.NotEmpty(t, keyPair.Cert)
require.NotEmpty(t, keyPair.Key)
require.NotEmpty(t, certPEM)
require.NotEmpty(t, keyPEM)
block, _ := pem.Decode(keyPair.Cert)
block, _ := pem.Decode(certPEM)
require.NotNil(t, block, "failed to decode certificate PEM")
cert, err := x509.ParseCertificate(block.Bytes)
@ -33,10 +33,8 @@ func TestGenerateTLSCert(t *testing.T) {
assert.True(t, cert.BasicConstraintsValid, "basic constraints should be valid")
assert.Contains(t, cert.ExtKeyUsage, x509.ExtKeyUsageServerAuth)
assert.Contains(t, cert.ExtKeyUsage, x509.ExtKeyUsageClientAuth)
assert.Contains(t, cert.DNSNames, "localhost", "DNS names should include localhost")
assert.Contains(t, cert.DNSNames, "rtmp.example.com", "DNS names should include rtmp.example.com")
block, _ = pem.Decode(keyPair.Key)
block, _ = pem.Decode(keyPEM)
require.NotNil(t, block, "failed to decode private key PEM")
privKey, err := x509.ParseECPrivateKey(block.Bytes)

View File

@ -1,4 +1,4 @@
package event
package terminal
// CommandAddDestination adds a destination.
type CommandAddDestination struct {
@ -41,14 +41,6 @@ func (c CommandStopDestination) Name() string {
return "stop_destination"
}
// CommandCloseOtherInstance closes the other instance of the application.
type CommandCloseOtherInstance struct{}
// Name implements the Command interface.
func (c CommandCloseOtherInstance) Name() string {
return "close_other_instance"
}
// CommandQuit quits the app.
type CommandQuit struct{}

View File

@ -13,7 +13,6 @@ import (
"time"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
"git.netflux.io/rob/octoplex/internal/shortid"
"github.com/gdamore/tcell/v2"
"github.com/rivo/tview"
@ -21,6 +20,7 @@ import (
)
type sourceViews struct {
url *tview.TextView
status *tview.TextView
tracks *tview.TextView
health *tview.TextView
@ -41,11 +41,9 @@ const (
// UI is responsible for managing the terminal user interface.
type UI struct {
eventBus *event.Bus
dispatch func(event.Command)
commandC chan Command
clipboardAvailable bool
configFilePath string
rtmpURL, rtmpsURL string
buildInfo domain.BuildInfo
logger *slog.Logger
@ -59,7 +57,6 @@ type UI struct {
sourceViews sourceViews
destView *tview.Table
noDestView *tview.TextView
aboutView *tview.Flex
pullProgressModal *tview.Modal
// other mutable state
@ -95,8 +92,7 @@ type ScreenCapture struct {
// StartParams contains the parameters for starting a new terminal user
// interface.
type StartParams struct {
EventBus *event.Bus
Dispatcher func(event.Command)
ChanSize int
Logger *slog.Logger
ClipboardAvailable bool
ConfigFilePath string
@ -104,8 +100,13 @@ type StartParams struct {
Screen *Screen // Screen may be nil.
}
const defaultChanSize = 64
// StartUI starts the terminal user interface.
func StartUI(ctx context.Context, params StartParams) (*UI, error) {
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
commandCh := make(chan Command, chanSize)
app := tview.NewApplication()
var screen tcell.Screen
@ -126,8 +127,8 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
sourceView := tview.NewFlex()
sourceView.SetDirection(tview.FlexColumn)
sourceView.SetBorder(true)
sourceView.SetTitle("Source")
sidebar.AddItem(sourceView, 8, 0, false)
sourceView.SetTitle("Source RTMP server")
sidebar.AddItem(sourceView, 9, 0, false)
leftCol := tview.NewFlex()
leftCol.SetDirection(tview.FlexRow)
@ -136,6 +137,11 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
sourceView.AddItem(leftCol, 9, 0, false)
sourceView.AddItem(rightCol, 0, 1, false)
urlHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerURL)
leftCol.AddItem(urlHeaderTextView, 1, 0, false)
urlTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash)
rightCol.AddItem(urlTextView, 1, 0, false)
statusHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerStatus)
leftCol.AddItem(statusHeaderTextView, 1, 0, false)
statusTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash)
@ -170,6 +176,13 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
aboutView.SetDirection(tview.FlexRow)
aboutView.SetBorder(true)
aboutView.SetTitle("Actions")
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]a[-] Add destination"), 1, 0, false)
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Del[-] Remove destination"), 1, 0, false)
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Space[-] Start/stop destination"), 1, 0, false)
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText(""), 1, 0, false)
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]u[-] Copy source RTMP URL"), 1, 0, false)
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false)
aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
sidebar.AddItem(aboutView, 0, 1, false)
@ -208,8 +221,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
app.EnableMouse(false)
ui := &UI{
eventBus: params.EventBus,
dispatch: params.Dispatcher,
commandC: commandCh,
clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath,
buildInfo: params.BuildInfo,
@ -220,6 +232,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
pages: pages,
container: container,
sourceViews: sourceViews{
url: urlTextView,
status: statusTextView,
tracks: tracksTextView,
health: healthTextView,
@ -229,7 +242,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
},
destView: destView,
noDestView: noDestView,
aboutView: aboutView,
pullProgressModal: pullProgressModal,
urlsToStartState: make(map[string]startState),
}
@ -242,37 +254,13 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
return ui, nil
}
func (ui *UI) renderAboutView() {
ui.aboutView.Clear()
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]a[-] Add destination"), 1, 0, false)
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Del[-] Remove destination"), 1, 0, false)
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Space[-] Start/stop destination"), 1, 0, false)
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText(""), 1, 0, false)
i := 1
if ui.rtmpURL != "" {
rtmpURLView := tview.NewTextView().SetDynamicColors(true).SetText(fmt.Sprintf("[grey]F%d[-] Copy source RTMP URL", i))
ui.aboutView.AddItem(rtmpURLView, 1, 0, false)
i++
}
if ui.rtmpsURL != "" {
rtmpsURLView := tview.NewTextView().SetDynamicColors(true).SetText(fmt.Sprintf("[grey]F%d[-] Copy source RTMPS URL", i))
ui.aboutView.AddItem(rtmpsURLView, 1, 0, false)
}
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false)
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
// C returns a channel that receives commands from the user interface.
func (ui *UI) C() <-chan Command {
return ui.commandC
}
func (ui *UI) run(ctx context.Context) {
defer func() {
// Ensure the application is stopped when the UI is closed.
ui.dispatch(event.CommandQuit{})
}()
eventC := ui.eventBus.Register()
defer close(ui.commandC)
uiDone := make(chan struct{})
go func() {
@ -287,34 +275,6 @@ func (ui *UI) run(ctx context.Context) {
for {
select {
case evt := <-eventC:
ui.app.QueueUpdateDraw(func() {
switch evt := evt.(type) {
case event.AppStateChangedEvent:
ui.handleAppStateChanged(evt)
case event.DestinationAddedEvent:
ui.handleDestinationAdded(evt)
case event.StartDestinationFailedEvent:
ui.handleStartDestinationFailed(evt)
case event.AddDestinationFailedEvent:
ui.handleDestinationEventError(evt.Err)
case event.DestinationStreamExitedEvent:
ui.handleDestinationStreamExited(evt)
case event.DestinationRemovedEvent:
ui.handleDestinationRemoved(evt)
case event.RemoveDestinationFailedEvent:
ui.handleDestinationEventError(evt.Err)
case event.OtherInstanceDetectedEvent:
ui.handleOtherInstanceDetected(evt)
case event.MediaServerStartedEvent:
ui.handleMediaServerStarted(evt)
case event.FatalErrorOccurredEvent:
ui.handleFatalErrorOccurred(evt)
default:
ui.logger.Warn("unhandled event", "event", evt)
}
})
case <-ctx.Done():
return
case <-uiDone:
@ -323,15 +283,6 @@ func (ui *UI) run(ctx context.Context) {
}
}
func (ui *UI) handleMediaServerStarted(evt event.MediaServerStartedEvent) {
ui.mu.Lock()
ui.rtmpURL = evt.RTMPURL
ui.rtmpsURL = evt.RTMPSURL
ui.mu.Unlock()
ui.renderAboutView()
}
func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
// Special case: handle CTRL-C even when a modal is visible.
if event.Key() == tcell.KeyCtrlC {
@ -358,6 +309,8 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
return nil
case ' ':
ui.toggleDestination()
case 'u', 'U':
ui.copySourceURLToClipboard(ui.clipboardAvailable)
case 'c', 'C':
ui.copyConfigFilePathToClipboard(ui.clipboardAvailable, ui.configFilePath)
case '?':
@ -365,8 +318,6 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
case 'k': // tview vim bindings
handleKeyUp()
}
case tcell.KeyF1, tcell.KeyF2:
ui.fkeyHandler(event.Key())
case tcell.KeyDelete, tcell.KeyBackspace, tcell.KeyBackspace2:
ui.removeDestination()
return nil
@ -377,82 +328,79 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
return event
}
func (ui *UI) fkeyHandler(key tcell.Key) {
var urls []string
if ui.rtmpURL != "" {
urls = append(urls, ui.rtmpURL)
}
if ui.rtmpsURL != "" {
urls = append(urls, ui.rtmpsURL)
}
switch key {
case tcell.KeyF1:
if len(urls) == 0 {
return
}
ui.copySourceURLToClipboard(urls[0])
case tcell.KeyF2:
if len(urls) < 2 {
return
}
ui.copySourceURLToClipboard(urls[1])
}
func (ui *UI) ShowSourceNotLiveModal() {
ui.app.QueueUpdateDraw(func() {
ui.showModal(
pageNameModalStartupCheck,
fmt.Sprintf("Waiting for stream.\nStart streaming to the source URL then try again:\n\n%s", ui.sourceViews.url.GetText(true)),
[]string{"Ok"},
false,
nil,
)
})
}
func (ui *UI) handleStartDestinationFailed(event.StartDestinationFailedEvent) {
ui.showModal(
pageNameModalStartDestinationFailed,
"Waiting for stream.\n\nStart streaming to a source URL then try again.",
[]string{"Ok"},
false,
nil,
)
// ShowStartupCheckModal shows a modal dialog to the user, asking if they want
// to kill a running instance of Octoplex.
//
// The method will block until the user has made a choice, after which the
// channel will receive true if the user wants to quit the other instance, or
// false to quit this instance.
func (ui *UI) ShowStartupCheckModal() bool {
done := make(chan bool)
ui.app.QueueUpdateDraw(func() {
ui.showModal(
pageNameModalStartupCheck,
"Another instance of Octoplex may already be running.\n\nPressing continue will close that instance. Continue?",
[]string{"Continue", "Exit"},
false,
func(buttonIndex int, _ string) {
if buttonIndex == 0 {
done <- true
} else {
done <- false
}
},
)
})
return <-done
}
func (ui *UI) handleOtherInstanceDetected(event.OtherInstanceDetectedEvent) {
ui.showModal(
pageNameModalStartupCheck,
"Another instance of Octoplex may already be running.\n\nPressing continue will close that instance. Continue?",
[]string{"Continue", "Exit"},
false,
func(buttonIndex int, _ string) {
if buttonIndex == 0 {
ui.dispatch(event.CommandCloseOtherInstance{})
} else {
ui.dispatch(event.CommandQuit{})
}
},
)
func (ui *UI) ShowDestinationErrorModal(name string, err error) {
ui.app.QueueUpdateDraw(func() {
ui.showModal(
pageNameModalDestinationError,
fmt.Sprintf(
"Streaming to %s failed:\n\n%s",
cmp.Or(name, "this destination"),
err,
),
[]string{"Ok"},
true,
nil,
)
})
}
func (ui *UI) handleDestinationStreamExited(evt event.DestinationStreamExitedEvent) {
ui.showModal(
pageNameModalDestinationError,
fmt.Sprintf(
"Streaming to %s failed:\n\n%s",
cmp.Or(evt.Name, "this destination"),
evt.Err,
),
[]string{"Ok"},
true,
nil,
)
}
func (ui *UI) handleFatalErrorOccurred(evt event.FatalErrorOccurredEvent) {
ui.showModal(
pageNameModalFatalError,
fmt.Sprintf(
"An error occurred:\n\n%s",
evt.Message,
),
[]string{"Quit"},
false,
func(int, string) {
ui.dispatch(event.CommandQuit{})
},
)
// ShowFatalErrorModal displays the provided error. It sends a CommandQuit to the
// command channel when the user selects the Quit button.
func (ui *UI) ShowFatalErrorModal(errString string) {
ui.app.QueueUpdateDraw(func() {
ui.showModal(
pageNameModalFatalError,
fmt.Sprintf(
"An error occurred:\n\n%s",
errString,
),
[]string{"Quit"},
false,
func(int, string) {
ui.commandC <- CommandQuit{}
},
)
})
}
func (ui *UI) afterDrawHandler(screen tcell.Screen) {
@ -483,8 +431,11 @@ func (ui *UI) captureScreen(screen tcell.Screen) {
}
}
func (ui *UI) handleAppStateChanged(evt event.AppStateChangedEvent) {
state := evt.State
// SetState sets the state of the terminal user interface.
func (ui *UI) SetState(state domain.AppState) {
if state.Source.ExitReason != "" {
ui.handleMediaServerClosed(state.Source.ExitReason)
}
ui.updatePullProgress(state)
@ -496,7 +447,10 @@ func (ui *UI) handleAppStateChanged(evt event.AppStateChangedEvent) {
ui.hasDestinations = len(state.Destinations) > 0
ui.mu.Unlock()
ui.redrawFromState(state)
// The state is mutable so can't be passed into QueueUpdateDraw, which
// passes it to another goroutine, without cloning it first.
stateClone := state.Clone()
ui.app.QueueUpdateDraw(func() { ui.redrawFromState(stateClone) })
}
func (ui *UI) updatePullProgress(state domain.AppState) {
@ -517,7 +471,9 @@ func (ui *UI) updatePullProgress(state domain.AppState) {
}
if len(pullingContainers) == 0 {
ui.hideModal(pageNameModalPullProgress)
ui.app.QueueUpdateDraw(func() {
ui.hideModal(pageNameModalPullProgress)
})
return
}
@ -531,27 +487,29 @@ func (ui *UI) updatePullProgress(state domain.AppState) {
}
func (ui *UI) updateProgressModal(container domain.Container) {
modalName := string(pageNameModalPullProgress)
ui.app.QueueUpdateDraw(func() {
modalName := string(pageNameModalPullProgress)
var status string
// Avoid showing the long Docker pull status in the modal content.
if len(container.PullStatus) < 30 {
status = container.PullStatus
}
var status string
// Avoid showing the long Docker pull status in the modal content.
if len(container.PullStatus) < 30 {
status = container.PullStatus
}
modalContent := fmt.Sprintf(
"Pulling %s:\n%s (%d%%)\n\n%s",
container.ImageName,
status,
container.PullPercent,
container.PullProgress,
)
modalContent := fmt.Sprintf(
"Pulling %s:\n%s (%d%%)\n\n%s",
container.ImageName,
status,
container.PullPercent,
container.PullProgress,
)
if ui.pages.HasPage(modalName) {
ui.pullProgressModal.SetText(modalContent)
} else {
ui.pages.AddPage(modalName, ui.pullProgressModal, true, true)
}
if ui.pages.HasPage(modalName) {
ui.pullProgressModal.SetText(modalContent)
} else {
ui.pages.AddPage(modalName, ui.pullProgressModal, true, true)
}
})
}
// page names represent a specific page in the terminal user interface.
@ -559,21 +517,19 @@ func (ui *UI) updateProgressModal(container domain.Container) {
// Modals should generally have a unique name, which allows them to be stacked
// on top of other modals.
const (
pageNameMain = "main"
pageNameAddDestination = "add-destination"
pageNameViewURLs = "view-urls"
pageNameConfigUpdateFailed = "modal-config-update-failed"
pageNameNoDestinations = "no-destinations"
pageNameModalAbout = "modal-about"
pageNameModalClipboard = "modal-clipboard"
pageNameModalDestinationError = "modal-destination-error"
pageNameModalFatalError = "modal-fatal-error"
pageNameModalPullProgress = "modal-pull-progress"
pageNameModalQuit = "modal-quit"
pageNameModalRemoveDestination = "modal-remove-destination"
pageNameModalSourceError = "modal-source-error"
pageNameModalStartDestinationFailed = "modal-start-destination-failed"
pageNameModalStartupCheck = "modal-startup-check"
pageNameMain = "main"
pageNameAddDestination = "add-destination"
pageNameConfigUpdateFailed = "modal-config-update-failed"
pageNameNoDestinations = "no-destinations"
pageNameModalAbout = "modal-about"
pageNameModalClipboard = "modal-clipboard"
pageNameModalDestinationError = "modal-destination-error"
pageNameModalFatalError = "modal-fatal-error"
pageNameModalPullProgress = "modal-pull-progress"
pageNameModalQuit = "modal-quit"
pageNameModalRemoveDestination = "modal-remove-destination"
pageNameModalSourceError = "modal-source-error"
pageNameModalStartupCheck = "modal-startup-check"
)
// modalVisible returns true if any modal, including the add destination form,
@ -680,6 +636,26 @@ func (ui *UI) hideModal(pageName string) {
ui.app.SetFocus(ui.destView)
}
func (ui *UI) handleMediaServerClosed(exitReason string) {
ui.app.QueueUpdateDraw(func() {
if ui.pages.HasPage(pageNameModalSourceError) {
return
}
modal := tview.NewModal()
modal.SetText("Mediaserver error: " + exitReason).
AddButtons([]string{"Quit"}).
SetBackgroundColor(tcell.ColorBlack).
SetTextColor(tcell.ColorWhite).
SetDoneFunc(func(int, string) {
ui.commandC <- CommandQuit{}
})
modal.SetBorderStyle(tcell.StyleDefault.Background(tcell.ColorBlack).Foreground(tcell.ColorWhite))
ui.pages.AddPage(pageNameModalSourceError, modal, true, true)
})
}
const dash = "—"
const (
@ -720,6 +696,8 @@ func (ui *UI) redrawFromState(state domain.AppState) {
SetSelectable(false)
}
ui.sourceViews.url.SetText(state.Source.RTMPURL)
tracks := dash
if state.Source.Live && len(state.Source.Tracks) > 0 {
tracks = strings.Join(state.Source.Tracks, ", ")
@ -825,6 +803,24 @@ func (ui *UI) Close() {
ui.app.Stop()
}
func (ui *UI) ConfigUpdateFailed(err error) {
ui.app.QueueUpdateDraw(func() {
ui.showModal(
pageNameConfigUpdateFailed,
"Configuration update failed:\n\n"+err.Error(),
[]string{"Ok"},
false,
func(int, string) {
pageName, frontPage := ui.pages.GetFrontPage()
if pageName != pageNameAddDestination {
ui.logger.Warn("Unexpected page when configuration form closed", "page", pageName)
}
ui.app.SetFocus(frontPage)
},
)
})
}
func (ui *UI) addDestination() {
const (
inputLen = 60
@ -844,10 +840,10 @@ func (ui *UI) addDestination() {
AddInputField(inputLabelName, "My stream", inputLen, nil, nil).
AddInputField(inputLabelURL, "rtmp://", inputLen, nil, nil).
AddButton("Add", func() {
ui.dispatch(event.CommandAddDestination{
ui.commandC <- CommandAddDestination{
DestinationName: form.GetFormItemByLabel(inputLabelName).(*tview.InputField).GetText(),
URL: form.GetFormItemByLabel(inputLabelURL).(*tview.InputField).GetText(),
})
}
}).
AddButton("Cancel", func() {
ui.closeAddDestinationForm()
@ -902,42 +898,30 @@ func (ui *UI) removeDestination() {
false,
func(buttonIndex int, _ string) {
if buttonIndex == 0 {
ui.dispatch(event.CommandRemoveDestination{URL: url})
ui.commandC <- CommandRemoveDestination{URL: url}
}
},
)
}
func (ui *UI) handleDestinationAdded(event.DestinationAddedEvent) {
// DestinationAdded should be called when a new destination is added.
func (ui *UI) DestinationAdded() {
ui.mu.Lock()
ui.hasDestinations = true
ui.mu.Unlock()
ui.pages.HidePage(pageNameNoDestinations)
ui.closeAddDestinationForm()
ui.selectLastDestination()
ui.app.QueueUpdateDraw(func() {
ui.pages.HidePage(pageNameNoDestinations)
ui.closeAddDestinationForm()
ui.selectLastDestination()
})
}
func (ui *UI) handleDestinationRemoved(event.DestinationRemovedEvent) {
// DestinationRemoved should be called when a destination is removed.
func (ui *UI) DestinationRemoved() {
ui.selectPreviousDestination()
}
func (ui *UI) handleDestinationEventError(err error) {
ui.showModal(
pageNameConfigUpdateFailed,
"Configuration update failed:\n\n"+err.Error(),
[]string{"Ok"},
false,
func(int, string) {
pageName, frontPage := ui.pages.GetFrontPage()
if pageName != pageNameAddDestination {
ui.logger.Warn("Unexpected page when configuration form closed", "page", pageName)
}
ui.app.SetFocus(frontPage)
},
)
}
func (ui *UI) closeAddDestinationForm() {
var hasDestinations bool
ui.mu.Lock()
@ -978,21 +962,22 @@ func (ui *UI) toggleDestination() {
switch ss {
case startStateNotStarted:
ui.urlsToStartState[url] = startStateStarting
ui.dispatch(event.CommandStartDestination{URL: url})
ui.commandC <- CommandStartDestination{URL: url}
case startStateStarting:
// do nothing
return
case startStateStarted:
ui.dispatch(event.CommandStopDestination{URL: url})
ui.commandC <- CommandStopDestination{URL: url}
}
}
func (ui *UI) copySourceURLToClipboard(url string) {
func (ui *UI) copySourceURLToClipboard(clipboardAvailable bool) {
var text string
if ui.clipboardAvailable {
url := ui.sourceViews.url.GetText(true)
if clipboardAvailable {
clipboard.Write(clipboard.FmtText, []byte(url))
text = "URL copied to clipboard:\n\n" + url
text = "Source URL copied to clipboard:\n\n" + url
} else {
text = "Copy to clipboard not available:\n\n" + url
}
@ -1036,7 +1021,7 @@ func (ui *UI) confirmQuit() {
false,
func(buttonIndex int, _ string) {
if buttonIndex == 0 {
ui.dispatch(event.CommandQuit{})
ui.commandC <- CommandQuit{}
}
},
)

89
main.go
View File

@ -3,14 +3,11 @@ package main
import (
"cmp"
"context"
"errors"
"flag"
"fmt"
"io"
"log/slog"
"os"
"os/exec"
"os/signal"
"runtime/debug"
"syscall"
@ -30,25 +27,16 @@ var (
date string
)
var errShutdown = errors.New("shutdown")
func main() {
var exitStatus int
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := run(); errors.Is(err, errShutdown) {
exitStatus = 130
} else if err != nil {
exitStatus = 1
if err := run(ctx); err != nil {
_, _ = os.Stderr.WriteString("Error: " + err.Error() + "\n")
}
os.Exit(exitStatus)
}
func run() error {
ctx, cancel := context.WithCancelCause(context.Background())
defer cancel(nil)
func run(ctx context.Context) error {
configService, err := config.NewDefaultService()
if err != nil {
return fmt.Errorf("build config service: %w", err)
@ -84,26 +72,11 @@ func run() error {
if err != nil {
return fmt.Errorf("read or create config: %w", err)
}
headless := os.Getenv("OCTO_HEADLESS") != ""
logger, err := buildLogger(cfg.LogFile, headless)
logger, err := buildLogger(cfg.LogFile)
if err != nil {
return fmt.Errorf("build logger: %w", err)
}
if headless {
// When running in headless mode tview doesn't handle SIGINT for us.
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-ch
logger.Info("Received interrupt signal, exiting")
signal.Stop(ch)
cancel(errShutdown)
}()
}
var clipboardAvailable bool
if err = clipboard.Init(); err != nil {
logger.Warn("Clipboard not available", "err", err)
@ -124,22 +97,22 @@ func run() error {
return fmt.Errorf("read build info: %w", err)
}
app := app.New(app.Params{
ConfigService: configService,
DockerClient: dockerClient,
Headless: headless,
ClipboardAvailable: clipboardAvailable,
ConfigFilePath: configService.Path(),
BuildInfo: domain.BuildInfo{
GoVersion: buildInfo.GoVersion,
Version: version,
Commit: commit,
Date: date,
return app.Run(
ctx,
app.RunParams{
ConfigService: configService,
DockerClient: dockerClient,
ClipboardAvailable: clipboardAvailable,
ConfigFilePath: configService.Path(),
BuildInfo: domain.BuildInfo{
GoVersion: buildInfo.GoVersion,
Version: version,
Commit: commit,
Date: date,
},
Logger: logger,
},
Logger: logger,
})
return app.Run(ctx)
)
}
// editConfigFile opens the config file in the user's editor.
@ -189,24 +162,10 @@ func printUsage() {
os.Stderr.WriteString("\n")
os.Stderr.WriteString("Additionally, Octoplex can be configured with the following environment variables:\n\n")
os.Stderr.WriteString(" OCTO_DEBUG Enables debug logging if set\n")
os.Stderr.WriteString(" OCTO_HEADLESS Enables headless mode if set (experimental)\n\n")
}
// buildLogger builds the logger, which may be a no-op logger.
func buildLogger(cfg config.LogFile, headless bool) (*slog.Logger, error) {
build := func(w io.Writer) *slog.Logger {
var handlerOpts slog.HandlerOptions
if os.Getenv("OCTO_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
}
return slog.New(slog.NewTextHandler(w, &handlerOpts))
}
// In headless mode, always log to stderr.
if headless {
return build(os.Stderr), nil
}
func buildLogger(cfg config.LogFile) (*slog.Logger, error) {
if !cfg.Enabled {
return slog.New(slog.DiscardHandler), nil
}
@ -216,5 +175,9 @@ func buildLogger(cfg config.LogFile, headless bool) (*slog.Logger, error) {
return nil, fmt.Errorf("error opening log file: %w", err)
}
return build(fptr), nil
var handlerOpts slog.HandlerOptions
if os.Getenv("OCTO_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
}
return slog.New(slog.NewTextHandler(fptr, &handlerOpts)), nil
}

View File

@ -29,12 +29,6 @@ dir = "{{cwd}}"
run = "golangci-lint run"
alias = "l"
[tasks.fmt]
description = "Run formatter"
dir = "{{cwd}}"
run = "goimports -w ."
alias = "f"
[tasks.generate_mocks]
description = "Generate mocks"
dir = "{{cwd}}"