feat: multiplexer

This commit is contained in:
Rob Watson 2025-02-01 04:26:37 +01:00 committed by Rob Watson
parent 6678489f69
commit 246c6f6669
15 changed files with 1221 additions and 142 deletions

View File

@ -7,6 +7,6 @@ RUN apk add --no-cache \
curl curl
COPY --from=mediamtx /mediamtx /usr/bin/mediamtx COPY --from=mediamtx /mediamtx /usr/bin/mediamtx
COPY --from=mediamtx /mediamtx.yml /mediamtx.yml COPY build/mediamtx.yml /mediamtx.yml
CMD ["/usr/bin/mediamtx"] CMD ["/usr/bin/mediamtx"]

705
build/mediamtx.yml Normal file
View File

@ -0,0 +1,705 @@
###############################################
# Global settings
# Settings in this section are applied anywhere.
###############################################
# Global settings -> General
# Verbosity of the program; available values are "error", "warn", "info", "debug".
logLevel: info
# Destinations of log messages; available values are "stdout", "file" and "syslog".
logDestinations: [stdout]
# If "file" is in logDestinations, this is the file which will receive the logs.
logFile: mediamtx.log
# Timeout of read operations.
readTimeout: 10s
# Timeout of write operations.
writeTimeout: 10s
# Size of the queue of outgoing packets.
# A higher value allows to increase throughput, a lower value allows to save RAM.
writeQueueSize: 512
# Maximum size of outgoing UDP packets.
# This can be decreased to avoid fragmentation on networks with a low UDP MTU.
udpMaxPayloadSize: 1472
# Command to run when a client connects to the server.
# This is terminated with SIGINT when a client disconnects from the server.
# The following environment variables are available:
# * MTX_CONN_TYPE: connection type
# * MTX_CONN_ID: connection ID
# * RTSP_PORT: RTSP server port
runOnConnect:
# Restart the command if it exits.
runOnConnectRestart: no
# Command to run when a client disconnects from the server.
# Environment variables are the same of runOnConnect.
runOnDisconnect:
###############################################
# Global settings -> Authentication
# Authentication method. Available values are:
# * internal: users are stored in the configuration file
# * http: an external HTTP URL is contacted to perform authentication
# * jwt: an external identity server provides authentication through JWTs
authMethod: internal
# Internal authentication.
# list of users.
authInternalUsers:
# Default unprivileged user.
# Username. 'any' means any user, including anonymous ones.
- user: any
# Password. Not used in case of 'any' user.
pass:
# IPs or networks allowed to use this user. An empty list means any IP.
ips: []
# List of permissions.
permissions:
# Available actions are: publish, read, playback, api, metrics, pprof.
- action: publish
# Paths can be set to further restrict access to a specific path.
# An empty path means any path.
# Regular expressions can be used by using a tilde as prefix.
path:
- action: read
path:
- action: playback
path:
# Default administrator.
# This allows to use API, metrics and PPROF without authentication,
# if the IP is localhost.
- user: any
pass:
ips: ['127.0.0.1', '::1', '172.17.0.0/16']
permissions:
- action: api
- action: metrics
- action: pprof
# HTTP-based authentication.
# URL called to perform authentication. Every time a user wants
# to authenticate, the server calls this URL with the POST method
# and a body containing:
# {
# "user": "user",
# "password": "password",
# "ip": "ip",
# "action": "publish|read|playback|api|metrics|pprof",
# "path": "path",
# "protocol": "rtsp|rtmp|hls|webrtc|srt",
# "id": "id",
# "query": "query"
# }
# If the response code is 20x, authentication is accepted, otherwise
# it is discarded.
authHTTPAddress:
# Actions to exclude from HTTP-based authentication.
# Format is the same as the one of user permissions.
authHTTPExclude:
- action: api
- action: metrics
- action: pprof
# JWT-based authentication.
# Users have to login through an external identity server and obtain a JWT.
# This JWT must contain the claim "mediamtx_permissions" with permissions,
# for instance:
# {
# ...
# "mediamtx_permissions": [
# {
# "action": "publish",
# "path": "somepath"
# }
# ]
# }
# Users are expected to pass the JWT in the Authorization header or as a query parameter.
# This is the JWKS URL that will be used to pull (once) the public key that allows
# to validate JWTs.
authJWTJWKS:
# name of the claim that contains permissions.
authJWTClaimKey: mediamtx_permissions
###############################################
# Global settings -> Control API
# Enable controlling the server through the Control API.
api: no
# Address of the Control API listener.
apiAddress: :9997
# Enable TLS/HTTPS on the Control API server.
apiEncryption: no
# Path to the server key. This is needed only when encryption is yes.
# This can be generated with:
# openssl genrsa -out server.key 2048
# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
apiServerKey: server.key
# Path to the server certificate.
apiServerCert: server.crt
# Value of the Access-Control-Allow-Origin header provided in every HTTP response.
apiAllowOrigin: '*'
# List of IPs or CIDRs of proxies placed before the HTTP server.
# If the server receives a request from one of these entries, IP in logs
# will be taken from the X-Forwarded-For header.
apiTrustedProxies: []
###############################################
# Global settings -> Metrics
# Enable Prometheus-compatible metrics.
metrics: no
# Address of the metrics HTTP listener.
metricsAddress: :9998
# Enable TLS/HTTPS on the Metrics server.
metricsEncryption: no
# Path to the server key. This is needed only when encryption is yes.
# This can be generated with:
# openssl genrsa -out server.key 2048
# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
metricsServerKey: server.key
# Path to the server certificate.
metricsServerCert: server.crt
# Value of the Access-Control-Allow-Origin header provided in every HTTP response.
metricsAllowOrigin: '*'
# List of IPs or CIDRs of proxies placed before the HTTP server.
# If the server receives a request from one of these entries, IP in logs
# will be taken from the X-Forwarded-For header.
metricsTrustedProxies: []
###############################################
# Global settings -> PPROF
# Enable pprof-compatible endpoint to monitor performances.
pprof: no
# Address of the pprof listener.
pprofAddress: :9999
# Enable TLS/HTTPS on the pprof server.
pprofEncryption: no
# Path to the server key. This is needed only when encryption is yes.
# This can be generated with:
# openssl genrsa -out server.key 2048
# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
pprofServerKey: server.key
# Path to the server certificate.
pprofServerCert: server.crt
# Value of the Access-Control-Allow-Origin header provided in every HTTP response.
pprofAllowOrigin: '*'
# List of IPs or CIDRs of proxies placed before the HTTP server.
# If the server receives a request from one of these entries, IP in logs
# will be taken from the X-Forwarded-For header.
pprofTrustedProxies: []
###############################################
# Global settings -> Playback server
# Enable downloading recordings from the playback server.
playback: no
# Address of the playback server listener.
playbackAddress: :9996
# Enable TLS/HTTPS on the playback server.
playbackEncryption: no
# Path to the server key. This is needed only when encryption is yes.
# This can be generated with:
# openssl genrsa -out server.key 2048
# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
playbackServerKey: server.key
# Path to the server certificate.
playbackServerCert: server.crt
# Value of the Access-Control-Allow-Origin header provided in every HTTP response.
playbackAllowOrigin: '*'
# List of IPs or CIDRs of proxies placed before the HTTP server.
# If the server receives a request from one of these entries, IP in logs
# will be taken from the X-Forwarded-For header.
playbackTrustedProxies: []
###############################################
# Global settings -> RTSP server
# Enable publishing and reading streams with the RTSP protocol.
rtsp: yes
# List of enabled RTSP transport protocols.
# UDP is the most performant, but doesn't work when there's a NAT/firewall between
# server and clients, and doesn't support encryption.
# UDP-multicast allows to save bandwidth when clients are all in the same LAN.
# TCP is the most versatile, and does support encryption.
# The handshake is always performed with TCP.
rtspTransports: [udp, multicast, tcp]
# Encrypt handshakes and TCP streams with TLS (RTSPS).
# Available values are "no", "strict", "optional".
rtspEncryption: "no"
# Address of the TCP/RTSP listener. This is needed only when encryption is "no" or "optional".
rtspAddress: :8554
# Address of the TCP/TLS/RTSPS listener. This is needed only when encryption is "strict" or "optional".
rtspsAddress: :8322
# Address of the UDP/RTP listener. This is needed only when "udp" is in protocols.
rtpAddress: :8000
# Address of the UDP/RTCP listener. This is needed only when "udp" is in protocols.
rtcpAddress: :8001
# IP range of all UDP-multicast listeners. This is needed only when "multicast" is in protocols.
multicastIPRange: 224.1.0.0/16
# Port of all UDP-multicast/RTP listeners. This is needed only when "multicast" is in protocols.
multicastRTPPort: 8002
# Port of all UDP-multicast/RTCP listeners. This is needed only when "multicast" is in protocols.
multicastRTCPPort: 8003
# Path to the server key. This is needed only when encryption is "strict" or "optional".
# This can be generated with:
# openssl genrsa -out server.key 2048
# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
rtspServerKey: server.key
# Path to the server certificate. This is needed only when encryption is "strict" or "optional".
rtspServerCert: server.crt
# Authentication methods. Available are "basic" and "digest".
# "digest" doesn't provide any additional security and is available for compatibility only.
rtspAuthMethods: [basic]
###############################################
# Global settings -> RTMP server
# Enable publishing and reading streams with the RTMP protocol.
rtmp: yes
# Address of the RTMP listener. This is needed only when encryption is "no" or "optional".
rtmpAddress: :1935
# Encrypt connections with TLS (RTMPS).
# Available values are "no", "strict", "optional".
rtmpEncryption: "no"
# Address of the RTMPS listener. This is needed only when encryption is "strict" or "optional".
rtmpsAddress: :1936
# Path to the server key. This is needed only when encryption is "strict" or "optional".
# This can be generated with:
# openssl genrsa -out server.key 2048
# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
rtmpServerKey: server.key
# Path to the server certificate. This is needed only when encryption is "strict" or "optional".
rtmpServerCert: server.crt
###############################################
# Global settings -> HLS server
# Enable reading streams with the HLS protocol.
hls: yes
# Address of the HLS listener.
hlsAddress: :8888
# Enable TLS/HTTPS on the HLS server.
# This is required for Low-Latency HLS.
hlsEncryption: no
# Path to the server key. This is needed only when encryption is yes.
# This can be generated with:
# openssl genrsa -out server.key 2048
# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
hlsServerKey: server.key
# Path to the server certificate.
hlsServerCert: server.crt
# Value of the Access-Control-Allow-Origin header provided in every HTTP response.
# This allows to play the HLS stream from an external website.
hlsAllowOrigin: '*'
# List of IPs or CIDRs of proxies placed before the HLS server.
# If the server receives a request from one of these entries, IP in logs
# will be taken from the X-Forwarded-For header.
hlsTrustedProxies: []
# By default, HLS is generated only when requested by a user.
# This option allows to generate it always, avoiding the delay between request and generation.
hlsAlwaysRemux: no
# Variant of the HLS protocol to use. Available options are:
# * mpegts - uses MPEG-TS segments, for maximum compatibility.
# * fmp4 - uses fragmented MP4 segments, more efficient.
# * lowLatency - uses Low-Latency HLS.
hlsVariant: lowLatency
# Number of HLS segments to keep on the server.
# Segments allow to seek through the stream.
# Their number doesn't influence latency.
hlsSegmentCount: 7
# Minimum duration of each segment.
# A player usually puts 3 segments in a buffer before reproducing the stream.
# The final segment duration is also influenced by the interval between IDR frames,
# since the server changes the duration in order to include at least one IDR frame
# in each segment.
hlsSegmentDuration: 1s
# Minimum duration of each part.
# A player usually puts 3 parts in a buffer before reproducing the stream.
# Parts are used in Low-Latency HLS in place of segments.
# Part duration is influenced by the distance between video/audio samples
# and is adjusted in order to produce segments with a similar duration.
hlsPartDuration: 200ms
# Maximum size of each segment.
# This prevents RAM exhaustion.
hlsSegmentMaxSize: 50M
# Directory in which to save segments, instead of keeping them in the RAM.
# This decreases performance, since reading from disk is less performant than
# reading from RAM, but allows to save RAM.
hlsDirectory: ''
# The muxer will be closed when there are no
# reader requests and this amount of time has passed.
hlsMuxerCloseAfter: 60s
###############################################
# Global settings -> WebRTC server
# Enable publishing and reading streams with the WebRTC protocol.
webrtc: yes
# Address of the WebRTC HTTP listener.
webrtcAddress: :8889
# Enable TLS/HTTPS on the WebRTC server.
webrtcEncryption: no
# Path to the server key.
# This can be generated with:
# openssl genrsa -out server.key 2048
# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
webrtcServerKey: server.key
# Path to the server certificate.
webrtcServerCert: server.crt
# Value of the Access-Control-Allow-Origin header provided in every HTTP response.
# This allows to play the WebRTC stream from an external website.
webrtcAllowOrigin: '*'
# List of IPs or CIDRs of proxies placed before the WebRTC server.
# If the server receives a request from one of these entries, IP in logs
# will be taken from the X-Forwarded-For header.
webrtcTrustedProxies: []
# Address of a local UDP listener that will receive connections.
# Use a blank string to disable.
webrtcLocalUDPAddress: :8189
# Address of a local TCP listener that will receive connections.
# This is disabled by default since TCP is less efficient than UDP and
# introduces a progressive delay when network is congested.
webrtcLocalTCPAddress: ''
# WebRTC clients need to know the IP of the server.
# Gather IPs from interfaces and send them to clients.
webrtcIPsFromInterfaces: yes
# List of interfaces whose IPs will be sent to clients.
# An empty value means to use all available interfaces.
webrtcIPsFromInterfacesList: []
# List of additional hosts or IPs to send to clients.
webrtcAdditionalHosts: []
# ICE servers. Needed only when local listeners can't be reached by clients.
# STUN servers allows to obtain and share the public IP of the server.
# TURN/TURNS servers forces all traffic through them.
webrtcICEServers2: []
# - url: stun:stun.l.google.com:19302
# if user is "AUTH_SECRET", then authentication is secret based.
# the secret must be inserted into the password field.
# username: ''
# password: ''
# clientOnly: false
# Time to wait for the WebRTC handshake to complete.
webrtcHandshakeTimeout: 10s
# Maximum time to gather video tracks.
webrtcTrackGatherTimeout: 2s
###############################################
# Global settings -> SRT server
# Enable publishing and reading streams with the SRT protocol.
srt: yes
# Address of the SRT listener.
srtAddress: :8890
###############################################
# Default path settings
# Settings in "pathDefaults" are applied anywhere,
# unless they are overridden in "paths".
pathDefaults:
###############################################
# Default path settings -> General
# Source of the stream. This can be:
# * publisher -> the stream is provided by a RTSP, RTMP, WebRTC or SRT client
# * rtsp://existing-url -> the stream is pulled from another RTSP server / camera
# * rtsps://existing-url -> the stream is pulled from another RTSP server / camera with RTSPS
# * rtmp://existing-url -> the stream is pulled from another RTMP server / camera
# * rtmps://existing-url -> the stream is pulled from another RTMP server / camera with RTMPS
# * http://existing-url/stream.m3u8 -> the stream is pulled from another HLS server / camera
# * https://existing-url/stream.m3u8 -> the stream is pulled from another HLS server / camera with HTTPS
# * udp://ip:port -> the stream is pulled with UDP, by listening on the specified IP and port
# * srt://existing-url -> the stream is pulled from another SRT server / camera
# * whep://existing-url -> the stream is pulled from another WebRTC server / camera
# * wheps://existing-url -> the stream is pulled from another WebRTC server / camera with HTTPS
# * redirect -> the stream is provided by another path or server
# * rpiCamera -> the stream is provided by a Raspberry Pi Camera
# The following variables can be used in the source string:
# * $MTX_QUERY: query parameters (passed by first reader)
# * $G1, $G2, ...: regular expression groups, if path name is
# a regular expression.
source: publisher
# If the source is a URL, and the source certificate is self-signed
# or invalid, you can provide the fingerprint of the certificate in order to
# validate it anyway. It can be obtained by running:
# openssl s_client -connect source_ip:source_port </dev/null 2>/dev/null | sed -n '/BEGIN/,/END/p' > server.crt
# openssl x509 -in server.crt -noout -fingerprint -sha256 | cut -d "=" -f2 | tr -d ':'
sourceFingerprint:
# If the source is a URL, it will be pulled only when at least
# one reader is connected, saving bandwidth.
sourceOnDemand: no
# If sourceOnDemand is "yes", readers will be put on hold until the source is
# ready or until this amount of time has passed.
sourceOnDemandStartTimeout: 10s
# If sourceOnDemand is "yes", the source will be closed when there are no
# readers connected and this amount of time has passed.
sourceOnDemandCloseAfter: 10s
# Maximum number of readers. Zero means no limit.
maxReaders: 0
# SRT encryption passphrase require to read from this path
srtReadPassphrase:
# If the stream is not available, redirect readers to this path.
# It can be can be a relative path (i.e. /otherstream) or an absolute RTSP URL.
fallback:
###############################################
# Default path settings -> Record
# Record streams to disk.
record: no
# Path of recording segments.
# Extension is added automatically.
# Available variables are %path (path name), %Y %m %d %H %M %S %f %s (time in strftime format)
recordPath: ./recordings/%path/%Y-%m-%d_%H-%M-%S-%f
# Format of recorded segments.
# Available formats are "fmp4" (fragmented MP4) and "mpegts" (MPEG-TS).
recordFormat: fmp4
# fMP4 segments are concatenation of small MP4 files (parts), each with this duration.
# MPEG-TS segments are concatenation of 188-bytes packets, flushed to disk with this period.
# When a system failure occurs, the last part gets lost.
# Therefore, the part duration is equal to the RPO (recovery point objective).
recordPartDuration: 1s
# Minimum duration of each segment.
recordSegmentDuration: 1h
# Delete segments after this timespan.
# Set to 0s to disable automatic deletion.
recordDeleteAfter: 24h
###############################################
# Default path settings -> Publisher source (when source is "publisher")
# Allow another client to disconnect the current publisher and publish in its place.
overridePublisher: yes
# SRT encryption passphrase required to publish to this path
srtPublishPassphrase:
###############################################
# Default path settings -> RTSP source (when source is a RTSP or a RTSPS URL)
# Transport protocol used to pull the stream. available values are "automatic", "udp", "multicast", "tcp".
rtspTransport: automatic
# Support sources that don't provide server ports or use random server ports. This is a security issue
# and must be used only when interacting with sources that require it.
rtspAnyPort: no
# Range header to send to the source, in order to start streaming from the specified offset.
# available values:
# * clock: Absolute time
# * npt: Normal Play Time
# * smpte: SMPTE timestamps relative to the start of the recording
rtspRangeType:
# Available values:
# * clock: UTC ISO 8601 combined date and time string, e.g. 20230812T120000Z
# * npt: duration such as "300ms", "1.5m" or "2h45m", valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h"
# * smpte: duration such as "300ms", "1.5m" or "2h45m", valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h"
rtspRangeStart:
###############################################
# Default path settings -> Redirect source (when source is "redirect")
# RTSP URL which clients will be redirected to.
sourceRedirect:
###############################################
# Default path settings -> Raspberry Pi Camera source (when source is "rpiCamera")
# ID of the camera
rpiCameraCamID: 0
# Width of frames
rpiCameraWidth: 1920
# Height of frames
rpiCameraHeight: 1080
# Flip horizontally
rpiCameraHFlip: false
# Flip vertically
rpiCameraVFlip: false
# Brightness [-1, 1]
rpiCameraBrightness: 0
# Contrast [0, 16]
rpiCameraContrast: 1
# Saturation [0, 16]
rpiCameraSaturation: 1
# Sharpness [0, 16]
rpiCameraSharpness: 1
# Exposure mode.
# values: normal, short, long, custom
rpiCameraExposure: normal
# Auto-white-balance mode.
# values: auto, incandescent, tungsten, fluorescent, indoor, daylight, cloudy, custom
rpiCameraAWB: auto
# Auto-white-balance fixed gains. This can be used in place of rpiCameraAWB.
# format: [red,blue]
rpiCameraAWBGains: [0, 0]
# Denoise operating mode.
# values: off, cdn_off, cdn_fast, cdn_hq
rpiCameraDenoise: "off"
# Fixed shutter speed, in microseconds.
rpiCameraShutter: 0
# Metering mode of the AEC/AGC algorithm.
# values: centre, spot, matrix, custom
rpiCameraMetering: centre
# Fixed gain
rpiCameraGain: 0
# EV compensation of the image [-10, 10]
rpiCameraEV: 0
# Region of interest, in format x,y,width,height (all normalized between 0 and 1)
rpiCameraROI:
# Whether to enable HDR on Raspberry Camera 3.
rpiCameraHDR: false
# Tuning file
rpiCameraTuningFile:
# Sensor mode, in format [width]:[height]:[bit-depth]:[packing]
# bit-depth and packing are optional.
rpiCameraMode:
# frames per second
rpiCameraFPS: 30
# Autofocus mode
# values: auto, manual, continuous
rpiCameraAfMode: continuous
# Autofocus range
# values: normal, macro, full
rpiCameraAfRange: normal
# Autofocus speed
# values: normal, fast
rpiCameraAfSpeed: normal
# Lens position (for manual autofocus only), will be set to focus to a specific distance
# calculated by the following formula: d = 1 / value
# Examples: 0 moves the lens to infinity.
# 0.5 moves the lens to focus on objects 2m away.
# 2 moves the lens to focus on objects 50cm away.
rpiCameraLensPosition: 0.0
# Specifies the autofocus window, in the form x,y,width,height where the coordinates
# are given as a proportion of the entire image.
rpiCameraAfWindow:
# Manual flicker correction period, in microseconds.
rpiCameraFlickerPeriod: 0
# Enables printing text on each frame.
rpiCameraTextOverlayEnable: false
# Text that is printed on each frame.
# format is the one of the strftime() function.
rpiCameraTextOverlay: '%Y-%m-%d %H:%M:%S - MediaMTX'
# Codec. Available values: auto, hardwareH264, softwareH264
rpiCameraCodec: auto
# Period between IDR frames
rpiCameraIDRPeriod: 60
# Bitrate
rpiCameraBitrate: 5000000
# H264 profile
rpiCameraProfile: main
# H264 level
rpiCameraLevel: '4.1'
###############################################
# Default path settings -> Hooks
# Command to run when this path is initialized.
# This can be used to publish a stream when the server is launched.
# This is terminated with SIGINT when the program closes.
# The following environment variables are available:
# * MTX_PATH: path name
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
runOnInit:
# Restart the command if it exits.
runOnInitRestart: no
# Command to run when this path is requested by a reader
# and no one is publishing to this path yet.
# This can be used to publish a stream on demand.
# This is terminated with SIGINT when there are no readers anymore.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_QUERY: query parameters (passed by first reader)
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
runOnDemand:
# Restart the command if it exits.
runOnDemandRestart: no
# Readers will be put on hold until the runOnDemand command starts publishing
# or until this amount of time has passed.
runOnDemandStartTimeout: 10s
# The command will be closed when there are no
# readers connected and this amount of time has passed.
runOnDemandCloseAfter: 10s
# Command to run when there are no readers anymore.
# Environment variables are the same of runOnDemand.
runOnUnDemand:
# Command to run when the stream is ready to be read, whenever it is
# published by a client or pulled from a server / camera.
# This is terminated with SIGINT when the stream is not ready anymore.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_QUERY: query parameters (passed by publisher)
# * MTX_SOURCE_TYPE: source type
# * MTX_SOURCE_ID: source ID
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
runOnReady:
# Restart the command if it exits.
runOnReadyRestart: no
# Command to run when the stream is not available anymore.
# Environment variables are the same of runOnReady.
runOnNotReady:
# Command to run when a client starts reading.
# This is terminated with SIGINT when a client stops reading.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_QUERY: query parameters (passed by reader)
# * MTX_READER_TYPE: reader type
# * MTX_READER_ID: reader ID
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
runOnRead:
# Restart the command if it exits.
runOnReadRestart: no
# Command to run when a client stops reading.
# Environment variables are the same of runOnRead.
runOnUnread:
# Command to run when a recording segment is created.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_SEGMENT_PATH: segment file path
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
runOnRecordSegmentCreate:
# Command to run when a recording segment is complete.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_SEGMENT_PATH: segment file path
# * MTX_SEGMENT_DURATION: segment duration
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
runOnRecordSegmentComplete:
###############################################
# Path settings
# Settings in "paths" are applied to specific paths, and the map key
# is the name of the path.
# Any setting in "pathDefaults" can be overridden here.
# It's possible to use regular expressions by using a tilde as prefix,
# for example "~^(test1|test2)$" will match both "test1" and "test2",
# for example "~^prefix" will match all paths that start with "prefix".
paths:
# example:
# my_camera:
# source: rtsp://my_camera
# Settings under path "all_others" are applied to all paths that
# do not match another entry.
all_others:

View File

@ -175,9 +175,8 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (<
now := time.Now() now := time.Now()
containerStateC := make(chan domain.Container, cmp.Or(params.ChanSize, defaultChanSize)) containerStateC := make(chan domain.Container, cmp.Or(params.ChanSize, defaultChanSize))
errC := make(chan error, 1) errC := make(chan error, 1)
closeWithError := func(err error) { sendError := func(err error) {
errC <- err errC <- err
close(errC)
} }
a.wg.Add(1) a.wg.Add(1)
@ -189,7 +188,7 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (<
pullReader, err := a.apiClient.ImagePull(ctx, params.ContainerConfig.Image, image.PullOptions{}) pullReader, err := a.apiClient.ImagePull(ctx, params.ContainerConfig.Image, image.PullOptions{})
if err != nil { if err != nil {
closeWithError(fmt.Errorf("image pull: %w", err)) sendError(fmt.Errorf("image pull: %w", err))
return return
} }
_, _ = io.Copy(io.Discard, pullReader) _, _ = io.Copy(io.Discard, pullReader)
@ -212,16 +211,17 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (<
name, name,
) )
if err != nil { if err != nil {
closeWithError(fmt.Errorf("container create: %w", err)) sendError(fmt.Errorf("container create: %w", err))
return return
} }
containerStateC <- domain.Container{ID: createResp.ID, State: "created"} containerStateC <- domain.Container{ID: createResp.ID, State: "created"}
if err = a.apiClient.ContainerStart(ctx, createResp.ID, container.StartOptions{}); err != nil { if err = a.apiClient.ContainerStart(ctx, createResp.ID, container.StartOptions{}); err != nil {
closeWithError(fmt.Errorf("container start: %w", err)) sendError(fmt.Errorf("container start: %w", err))
return return
} }
a.logger.Info("Started container", "id", shortID(createResp.ID), "duration", time.Since(now)) a.logger.Info("Started container", "id", shortID(createResp.ID), "duration", time.Since(now))
containerStateC <- domain.Container{ID: createResp.ID, State: "running"} containerStateC <- domain.Container{ID: createResp.ID, State: "running"}
a.runContainerLoop(ctx, createResp.ID, containerStateC, errC) a.runContainerLoop(ctx, createResp.ID, containerStateC, errC)
@ -245,6 +245,8 @@ func (a *Client) runContainerLoop(ctx context.Context, containerID string, state
select { select {
case resp := <-containerRespC: case resp := <-containerRespC:
a.logger.Info("Container entered non-running state", "exit_code", resp.StatusCode, "id", shortID(containerID)) a.logger.Info("Container entered non-running state", "exit_code", resp.StatusCode, "id", shortID(containerID))
state.State = "exited"
sendState()
return return
case err := <-containerErrC: case err := <-containerErrC:
// TODO: error handling? // TODO: error handling?

View File

@ -113,7 +113,7 @@ func TestClientRemoveContainers(t *testing.T) {
return running return running
}, },
5*time.Second, 5*time.Second,
100*time.Millisecond, 500*time.Millisecond,
"container group 1 not in RUNNING state", "container group 1 not in RUNNING state",
) )
// check all containers in group 2 are running // check all containers in group 2 are running
@ -124,7 +124,7 @@ func TestClientRemoveContainers(t *testing.T) {
return running return running
}, },
2*time.Second, 2*time.Second,
100*time.Millisecond, 500*time.Millisecond,
"container group 2 not in RUNNING state", "container group 2 not in RUNNING state",
) )
@ -141,7 +141,7 @@ func TestClientRemoveContainers(t *testing.T) {
return err == nil && !running return err == nil && !running
}, },
2*time.Second, 2*time.Second,
100*time.Millisecond, 500*time.Millisecond,
"container group 1 still in RUNNING state", "container group 1 still in RUNNING state",
) )

View File

@ -10,6 +10,7 @@ type AppState struct {
type Source struct { type Source struct {
Container Container Container Container
Live bool Live bool
Listeners int
URL string URL string
} }

2
go.mod
View File

@ -4,6 +4,7 @@ go 1.23.5
require ( require (
github.com/docker/docker v27.5.0+incompatible github.com/docker/docker v27.5.0+incompatible
github.com/docker/go-connections v0.5.0
github.com/gdamore/tcell/v2 v2.7.1 github.com/gdamore/tcell/v2 v2.7.1
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/rivo/tview v0.0.0-20241227133733-17b7edb88c57 github.com/rivo/tview v0.0.0-20241227133733-17b7edb88c57
@ -16,7 +17,6 @@ require (
github.com/containerd/log v0.1.0 // indirect github.com/containerd/log v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/distribution/reference v0.6.0 // indirect github.com/distribution/reference v0.6.0 // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/gdamore/encoding v1.0.0 // indirect github.com/gdamore/encoding v1.0.0 // indirect

45
main.go
View File

@ -12,6 +12,7 @@ import (
"git.netflux.io/rob/termstream/container" "git.netflux.io/rob/termstream/container"
"git.netflux.io/rob/termstream/domain" "git.netflux.io/rob/termstream/domain"
"git.netflux.io/rob/termstream/mediaserver" "git.netflux.io/rob/termstream/mediaserver"
"git.netflux.io/rob/termstream/multiplexer"
"git.netflux.io/rob/termstream/terminal" "git.netflux.io/rob/termstream/terminal"
) )
@ -42,6 +43,15 @@ func run(ctx context.Context, cfgReader io.Reader) error {
logger := slog.New(slog.NewTextHandler(logFile, nil)) logger := slog.New(slog.NewTextHandler(logFile, nil))
logger.Info("Starting termstream", slog.Any("initial_state", state)) logger.Info("Starting termstream", slog.Any("initial_state", state))
ui, err := terminal.StartActor(ctx, terminal.StartActorParams{Logger: logger.With("component", "ui")})
if err != nil {
return fmt.Errorf("start tui: %w", err)
}
defer ui.Close()
updateUI := func() { ui.SetState(*state) }
updateUI()
containerClient, err := container.NewClient(ctx, logger.With("component", "container_client")) containerClient, err := container.NewClient(ctx, logger.With("component", "container_client"))
if err != nil { if err != nil {
return fmt.Errorf("new container client: %w", err) return fmt.Errorf("new container client: %w", err)
@ -52,15 +62,14 @@ func run(ctx context.Context, cfgReader io.Reader) error {
ContainerClient: containerClient, ContainerClient: containerClient,
Logger: logger.With("component", "mediaserver"), Logger: logger.With("component", "mediaserver"),
}) })
defer srv.Close()
ui, err := terminal.StartActor(ctx, terminal.StartActorParams{Logger: logger.With("component", "ui")}) mp := multiplexer.NewActor(ctx, multiplexer.NewActorParams{
if err != nil { SourceURL: srv.State().URL,
return fmt.Errorf("start tui: %w", err) ContainerClient: containerClient,
} Logger: logger.With("component", "multiplexer"),
defer ui.Close() })
defer mp.Close()
updateUI := func() { ui.SetState(*state) }
updateUI()
uiTicker := time.NewTicker(uiUpdateInterval) uiTicker := time.NewTicker(uiUpdateInterval)
defer uiTicker.Stop() defer uiTicker.Stop()
@ -73,13 +82,21 @@ func run(ctx context.Context, cfgReader io.Reader) error {
logger.Info("UI closed") logger.Info("UI closed")
return nil return nil
} }
logger.Info("Command received", "cmd", cmd) logger.Info("Command received", "cmd", cmd)
switch c := cmd.(type) {
case terminal.CommandToggleDestination:
mp.ToggleDestination(c.URL)
}
case <-uiTicker.C: case <-uiTicker.C:
// TODO: update UI with current state? // TODO: update UI with current state?
updateUI() updateUI()
case serverState := <-srv.C(): case serverState := <-srv.C():
applyServerState(serverState, state) applyServerState(serverState, state)
updateUI() updateUI()
case mpState := <-mp.C():
applyMultiplexerState(mpState, state)
updateUI()
} }
} }
} }
@ -89,6 +106,18 @@ func applyServerState(serverState domain.Source, appState *domain.AppState) {
appState.Source = serverState appState.Source = serverState
} }
func applyMultiplexerState(destination domain.Destination, appState *domain.AppState) {
for i, dest := range appState.Destinations {
if dest.URL != destination.URL {
continue
}
appState.Destinations[i] = destination
break
}
}
// applyConfig applies the configuration to the app state. // applyConfig applies the configuration to the app state.
func applyConfig(cfg config.Config, appState *domain.AppState) { func applyConfig(cfg config.Config, appState *domain.AppState) {
appState.Destinations = make([]domain.Destination, 0, len(cfg.Destinations)) appState.Destinations = make([]domain.Destination, 0, len(cfg.Destinations))

View File

@ -3,22 +3,28 @@ package mediaserver
import ( import (
"cmp" "cmp"
"context" "context"
"encoding/json"
"fmt" "fmt"
"io"
"log/slog" "log/slog"
"net/http" "net/http"
"strconv"
"time" "time"
typescontainer "github.com/docker/docker/api/types/container" typescontainer "github.com/docker/docker/api/types/container"
"github.com/docker/go-connections/nat"
"git.netflux.io/rob/termstream/container" "git.netflux.io/rob/termstream/container"
"git.netflux.io/rob/termstream/domain" "git.netflux.io/rob/termstream/domain"
) )
const ( const (
imageNameMediaMTX = "netfluxio/mediamtx-alpine:latest" defaultFetchIngressStateInterval = 5 * time.Second // default interval to fetch the state of the media server
rtmpPath = "live" defaultAPIPort = 9997 // default API port for the media server
defaultRTMPPort = 1935 // default RTMP port for the media server
defaultChanSize = 64 // default channel size for asynchronous non-error channels
imageNameMediaMTX = "netfluxio/mediamtx-alpine:latest" // image name for mediamtx
rtmpPath = "live" // RTMP path for the media server
componentName = "mediaserver" // component name, mostly used for Docker labels
httpClientTimeout = time.Second // timeout for outgoing HTTP client requests
) )
// action is an action to be performed by the actor. // action is an action to be performed by the actor.
@ -26,11 +32,14 @@ type action func()
// Actor is responsible for managing the media server. // Actor is responsible for managing the media server.
type Actor struct { type Actor struct {
actorC chan action actorC chan action
stateC chan domain.Source stateC chan domain.Source
containerClient *container.Client containerClient *container.Client
logger *slog.Logger apiPort int
httpClient *http.Client rtmpPort int
fetchIngressStateInterval time.Duration
logger *slog.Logger
httpClient *http.Client
// mutable state // mutable state
state *domain.Source state *domain.Source
@ -39,17 +48,14 @@ type Actor struct {
// StartActorParams contains the parameters for starting a new media server // StartActorParams contains the parameters for starting a new media server
// actor. // actor.
type StartActorParams struct { type StartActorParams struct {
ContainerClient *container.Client APIPort int // defaults to 9997
ChanSize int RTMPPort int // defaults to 1935
Logger *slog.Logger ChanSize int // defaults to 64
FetchIngressStateInterval time.Duration // defaults to 5 seconds
ContainerClient *container.Client
Logger *slog.Logger
} }
const (
defaultChanSize = 64
componentName = "mediaserver"
httpClientTimeout = time.Second
)
// StartActor starts a new media server actor. // StartActor starts a new media server actor.
// //
// Callers must consume the state channel exposed via [C]. // Callers must consume the state channel exposed via [C].
@ -57,18 +63,25 @@ func StartActor(ctx context.Context, params StartActorParams) *Actor {
chanSize := cmp.Or(params.ChanSize, defaultChanSize) chanSize := cmp.Or(params.ChanSize, defaultChanSize)
actor := &Actor{ actor := &Actor{
actorC: make(chan action, chanSize), apiPort: cmp.Or(params.APIPort, defaultAPIPort),
state: new(domain.Source), rtmpPort: cmp.Or(params.RTMPPort, defaultRTMPPort),
stateC: make(chan domain.Source, chanSize), fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval),
containerClient: params.ContainerClient, actorC: make(chan action, chanSize),
logger: params.Logger, state: new(domain.Source),
httpClient: &http.Client{Timeout: httpClientTimeout}, stateC: make(chan domain.Source, chanSize),
containerClient: params.ContainerClient,
logger: params.Logger,
httpClient: &http.Client{Timeout: httpClientTimeout},
} }
apiPortSpec := nat.Port(strconv.Itoa(actor.apiPort) + ":9997")
rtmpPortSpec := nat.Port(strconv.Itoa(actor.rtmpPort) + ":1935")
exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)})
containerStateC, errC := params.ContainerClient.RunContainer( containerStateC, errC := params.ContainerClient.RunContainer(
ctx, ctx,
container.RunContainerParams{ container.RunContainerParams{
Name: "server", Name: componentName,
ChanSize: chanSize, ChanSize: chanSize,
ContainerConfig: &typescontainer.Config{ ContainerConfig: &typescontainer.Config{
Image: imageNameMediaMTX, Image: imageNameMediaMTX,
@ -86,14 +99,16 @@ func StartActor(ctx context.Context, params StartActorParams) *Actor {
StartInterval: time.Second * 2, StartInterval: time.Second * 2,
Retries: 2, Retries: 2,
}, },
ExposedPorts: exposedPorts,
}, },
HostConfig: &typescontainer.HostConfig{ HostConfig: &typescontainer.HostConfig{
NetworkMode: "host", NetworkMode: "default",
PortBindings: portBindings,
}, },
}, },
) )
actor.state.URL = "rtmp://localhost:1935/" + rtmpPath actor.state.URL = actor.rtmpURL()
go actor.actorLoop(containerStateC, errC) go actor.actorLoop(containerStateC, errC)
@ -128,8 +143,8 @@ func (s *Actor) Close() error {
// actorLoop is the main loop of the media server actor. It only exits when the // actorLoop is the main loop of the media server actor. It only exits when the
// actor is closed. // actor is closed.
func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan error) { func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan error) {
ticker := time.NewTicker(5 * time.Second) fetchStateT := time.NewTicker(s.fetchIngressStateInterval)
defer ticker.Stop() defer fetchStateT.Stop()
sendState := func() { s.stateC <- *s.state } sendState := func() { s.stateC <- *s.state }
@ -145,20 +160,21 @@ func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan e
s.logger.Error("Error from container client", "error", err, "id", shortID(s.state.Container.ID)) s.logger.Error("Error from container client", "error", err, "id", shortID(s.state.Container.ID))
} }
ticker.Stop() fetchStateT.Stop()
if s.state.Live { if s.state.Live {
s.state.Live = false s.state.Live = false
sendState() sendState()
} }
case <-ticker.C: case <-fetchStateT.C:
ingressLive, err := s.fetchIngressStateFromServer() ingressState, err := s.fetchIngressState()
if err != nil { if err != nil {
s.logger.Error("Error fetching server state", "error", err) s.logger.Error("Error fetching server state", "error", err)
continue continue
} }
if ingressLive != s.state.Live { if ingressState.ready != s.state.Live || ingressState.listeners != s.state.Listeners {
s.state.Live = ingressLive s.state.Live = ingressState.ready
s.state.Listeners = ingressState.listeners
sendState() sendState()
} }
case action, ok := <-s.actorC: case action, ok := <-s.actorC:
@ -170,54 +186,17 @@ func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan e
} }
} }
type apiResponse[T any] struct { // rtmpURL returns the RTMP URL for the media server, accessible from the host.
Items []T `json:"items"` func (s *Actor) rtmpURL() string {
return fmt.Sprintf("rtmp://localhost:%d/%s", s.rtmpPort, rtmpPath)
} }
type rtmpConnsResponse struct { // apiURL returns the API URL for the media server, accessible from the host.
ID string `json:"id"` func (s *Actor) apiURL() string {
CreatedAt time.Time `json:"created"` return fmt.Sprintf("http://localhost:%d/v3/rtmpconns/list", s.apiPort)
State string `json:"state"`
Path string `json:"path"`
BytesReceived int64 `json:"bytesReceived"`
BytesSent int64 `json:"bytesSent"`
RemoteAddr string `json:"remoteAddr"`
}
func (s *Actor) fetchIngressStateFromServer() (bool, error) {
req, err := http.NewRequest(http.MethodGet, "http://localhost:9997/v3/rtmpconns/list", nil)
if err != nil {
return false, fmt.Errorf("new request: %w", err)
}
httpResp, err := s.httpClient.Do(req)
if err != nil {
return false, fmt.Errorf("do request: %w", err)
}
if httpResp.StatusCode != http.StatusOK {
return false, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode)
}
respBody, err := io.ReadAll(httpResp.Body)
if err != nil {
return false, fmt.Errorf("read body: %w", err)
}
var resp apiResponse[rtmpConnsResponse]
if err = json.Unmarshal(respBody, &resp); err != nil {
return false, fmt.Errorf("unmarshal: %w", err)
}
for _, conn := range resp.Items {
if conn.Path == rtmpPath && conn.State == "publish" {
return true, nil
}
}
return false, nil
} }
// shortID returns the first 12 characters of the given container ID.
func shortID(id string) string { func shortID(id string) string {
if len(id) < 12 { if len(id) < 12 {
return id return id

View File

@ -2,8 +2,6 @@ package mediaserver_test
import ( import (
"context" "context"
"os/exec"
"syscall"
"testing" "testing"
"time" "time"
@ -30,9 +28,10 @@ func TestMediaServerStartStop(t *testing.T) {
assert.False(t, running) assert.False(t, running)
mediaServer := mediaserver.StartActor(ctx, mediaserver.StartActorParams{ mediaServer := mediaserver.StartActor(ctx, mediaserver.StartActorParams{
ChanSize: 1, FetchIngressStateInterval: 500 * time.Millisecond,
ContainerClient: containerClient, ChanSize: 1,
Logger: logger, ContainerClient: containerClient,
Logger: logger,
}) })
require.NoError(t, err) require.NoError(t, err)
testhelpers.ChanDiscard(mediaServer.C()) testhelpers.ChanDiscard(mediaServer.C())
@ -43,8 +42,8 @@ func TestMediaServerStartStop(t *testing.T) {
running, err = containerClient.ContainerRunning(ctx, map[string]string{"component": component}) running, err = containerClient.ContainerRunning(ctx, map[string]string{"component": component})
return err == nil && running return err == nil && running
}, },
5*time.Second, time.Second*10,
250*time.Millisecond, time.Second,
"container not in RUNNING state", "container not in RUNNING state",
) )
@ -52,15 +51,16 @@ func TestMediaServerStartStop(t *testing.T) {
assert.False(t, state.Live) assert.False(t, state.Live)
assert.Equal(t, "rtmp://localhost:1935/live", state.URL) assert.Equal(t, "rtmp://localhost:1935/live", state.URL)
launchFFMPEG(t, "rtmp://localhost:1935/live") testhelpers.StreamFLV(t, "rtmp://localhost:1935/live")
require.Eventually( require.Eventually(
t, t,
func() bool { func() bool {
currState := mediaServer.State() currState := mediaServer.State()
return currState.Live && currState.Container.HealthState == "healthy" return currState.Live && currState.Container.HealthState == "healthy"
}, },
5*time.Second, time.Second*5,
250*time.Millisecond, time.Second,
"actor not healthy and/or in LIVE state", "actor not healthy and/or in LIVE state",
) )
@ -70,33 +70,3 @@ func TestMediaServerStartStop(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
assert.False(t, running) assert.False(t, running)
} }
func launchFFMPEG(t *testing.T, destURL string) *exec.Cmd {
ctx, cancel := context.WithCancel(context.Background())
cmd := exec.CommandContext(
ctx,
"ffmpeg",
"-r", "30",
"-f", "lavfi",
"-i", "testsrc",
"-vf", "scale=1280:960",
"-vcodec", "libx264",
"-profile:v", "baseline",
"-pix_fmt", "yuv420p",
"-f", "flv",
destURL,
)
require.NoError(t, cmd.Start())
t.Cleanup(func() {
if cmd.Process != nil {
_ = cmd.Process.Signal(syscall.SIGINT)
}
cancel()
})
return cmd
}

72
mediaserver/api.go Normal file
View File

@ -0,0 +1,72 @@
package mediaserver
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
type apiResponse[T any] struct {
Items []T `json:"items"`
}
type rtmpConnsResponse struct {
ID string `json:"id"`
CreatedAt time.Time `json:"created"`
State string `json:"state"`
Path string `json:"path"`
BytesReceived int64 `json:"bytesReceived"`
BytesSent int64 `json:"bytesSent"`
RemoteAddr string `json:"remoteAddr"`
}
type ingressStreamState struct {
ready bool
listeners int
}
func (s *Actor) fetchIngressState() (state ingressStreamState, _ error) {
req, err := http.NewRequest(http.MethodGet, s.apiURL(), nil)
if err != nil {
return state, fmt.Errorf("new request: %w", err)
}
httpResp, err := s.httpClient.Do(req)
if err != nil {
return state, fmt.Errorf("do request: %w", err)
}
if httpResp.StatusCode != http.StatusOK {
return state, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode)
}
respBody, err := io.ReadAll(httpResp.Body)
if err != nil {
return state, fmt.Errorf("read body: %w", err)
}
var resp apiResponse[rtmpConnsResponse]
if err = json.Unmarshal(respBody, &resp); err != nil {
return state, fmt.Errorf("unmarshal: %w", err)
}
for _, conn := range resp.Items {
if conn.Path != rtmpPath {
continue
}
switch conn.State {
case "publish":
// mediamtx may report a stream as being in publish state via the API,
// but still refuse to serve them due to being unpublished. This seems to
// be a bug, this is a hacky workaround.
state.ready = conn.BytesReceived > 20_000
case "read":
state.listeners++
}
}
return state, nil
}

7
mise/tasks/push_mediamtx_image Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
#MISE description="Build and push mediamtx image"
set -euo pipefail
docker build -f build/mediamtx.Dockerfile -t netfluxio/mediamtx-alpine:latest .
docker push netfluxio/mediamtx-alpine:latest

170
multiplexer/multiplexer.go Normal file
View File

@ -0,0 +1,170 @@
package multiplexer
import (
"cmp"
"context"
"fmt"
"log/slog"
"strconv"
"sync"
typescontainer "github.com/docker/docker/api/types/container"
"git.netflux.io/rob/termstream/container"
"git.netflux.io/rob/termstream/domain"
)
type action func()
const (
defaultChanSize = 64 // default channel size for asynchronous non-error channels
componentName = "multiplexer" // component name, mostly used for Docker labels
imageNameFFMPEG = "ghcr.io/jrottenberg/ffmpeg:7.1-scratch" // image name for ffmpeg
)
// Actor is responsible for managing the multiplexer.
type Actor struct {
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
sourceURL string
containerClient *container.Client
logger *slog.Logger
actorC chan action
stateC chan domain.Destination
// mutable state
currURLs map[string]struct{}
nextIndex int
}
// NewActorParams contains the parameters for starting a new multiplexer actor.
type NewActorParams struct {
SourceURL string
ChanSize int
ContainerClient *container.Client
Logger *slog.Logger
}
// NewActor starts a new multiplexer actor.
//
// The channel exposed by [C] must be consumed by the caller.
func NewActor(ctx context.Context, params NewActorParams) *Actor {
ctx, cancel := context.WithCancel(ctx)
actor := &Actor{
ctx: ctx,
cancel: cancel,
sourceURL: params.SourceURL,
containerClient: params.ContainerClient,
logger: params.Logger,
actorC: make(chan action, cmp.Or(params.ChanSize, defaultChanSize)),
stateC: make(chan domain.Destination, cmp.Or(params.ChanSize, defaultChanSize)),
currURLs: make(map[string]struct{}),
}
go actor.actorLoop()
return actor
}
// ToggleDestination toggles the destination stream between on and off.
func (a *Actor) ToggleDestination(url string) {
a.actorC <- func() {
labels := map[string]string{"component": componentName, "url": url}
if _, ok := a.currURLs[url]; ok {
a.logger.Info("Stopping live stream", "url", url)
if err := a.containerClient.RemoveContainers(a.ctx, labels); err != nil {
// TODO: error handling
a.logger.Error("Failed to stop live stream", "url", url, "error", err)
}
delete(a.currURLs, url)
return
}
a.logger.Info("Starting live stream", "url", url)
containerStateC, errC := a.containerClient.RunContainer(a.ctx, container.RunContainerParams{
Name: componentName + "-" + strconv.Itoa(a.nextIndex),
ContainerConfig: &typescontainer.Config{
Image: imageNameFFMPEG,
Cmd: []string{
"-i", a.sourceURL,
"-c", "copy",
"-f", "flv",
url,
},
Labels: labels,
},
HostConfig: &typescontainer.HostConfig{
NetworkMode: "host",
},
})
a.nextIndex++
a.currURLs[url] = struct{}{}
a.wg.Add(1)
go func() {
defer a.wg.Done()
a.destLoop(url, containerStateC, errC)
}()
}
}
// destLoop is the actor loop for a destination stream.
func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, errC <-chan error) {
defer func() {
a.actorC <- func() {
delete(a.currURLs, url)
}
}()
state := &domain.Destination{URL: url}
sendState := func() { a.stateC <- *state }
for {
select {
case containerState := <-containerStateC:
state.Container = containerState
state.Live = containerState.State == "running"
sendState()
case err := <-errC:
// TODO: error handling
if err != nil {
a.logger.Error("Error from container client", "error", err)
}
return
}
}
}
// C returns a channel that will receive the current state of the multiplexer.
// The channel is never closed.
func (a *Actor) C() <-chan domain.Destination {
return a.stateC
}
// Close closes the actor.
func (a *Actor) Close() error {
if err := a.containerClient.RemoveContainers(context.Background(), map[string]string{"component": componentName}); err != nil {
return fmt.Errorf("remove containers: %w", err)
}
a.wg.Wait()
close(a.actorC)
return nil
}
// actorLoop is the main actor loop.
func (a *Actor) actorLoop() {
for act := range a.actorC {
act()
}
}

View File

@ -0,0 +1,85 @@
package multiplexer_test
import (
"context"
"testing"
"time"
"git.netflux.io/rob/termstream/container"
"git.netflux.io/rob/termstream/mediaserver"
"git.netflux.io/rob/termstream/multiplexer"
"git.netflux.io/rob/termstream/testhelpers"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const component = "multiplexer"
func TestMultiplexer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
logger := testhelpers.NewTestLogger()
containerClient, err := container.NewClient(ctx, logger)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, containerClient.Close()) })
running, err := containerClient.ContainerRunning(ctx, map[string]string{"component": component})
require.NoError(t, err)
assert.False(t, running)
srv := mediaserver.StartActor(ctx, mediaserver.StartActorParams{
RTMPPort: 1936,
APIPort: 9998,
FetchIngressStateInterval: 250 * time.Millisecond,
ContainerClient: containerClient,
ChanSize: 1,
Logger: logger,
})
defer srv.Close()
testhelpers.ChanDiscard(srv.C())
time.Sleep(2 * time.Second)
testhelpers.StreamFLV(t, srv.State().URL)
require.Eventually(
t,
func() bool { return srv.State().Live },
time.Second*10,
time.Second,
"source not live",
)
mp := multiplexer.NewActor(ctx, multiplexer.NewActorParams{
SourceURL: srv.State().URL,
ChanSize: 1,
ContainerClient: containerClient,
Logger: logger,
})
defer mp.Close()
testhelpers.ChanDiscard(mp.C())
requireListeners(t, srv, 0)
mp.ToggleDestination("rtmp://localhost:1936/destination/test1")
mp.ToggleDestination("rtmp://localhost:1936/destination/test2")
mp.ToggleDestination("rtmp://localhost:1936/destination/test3")
requireListeners(t, srv, 3)
mp.ToggleDestination("rtmp://localhost:1936/destination/test3")
requireListeners(t, srv, 2)
mp.ToggleDestination("rtmp://localhost:1936/destination/test2")
mp.ToggleDestination("rtmp://localhost:1936/destination/test1")
requireListeners(t, srv, 0)
}
func requireListeners(t *testing.T, srv *mediaserver.Actor, expected int) {
require.Eventually(
t,
func() bool { return srv.State().Listeners == expected },
time.Second*10,
time.Second,
"expected %d listeners", expected,
)
}

View File

@ -147,7 +147,7 @@ func (a *Actor) redrawFromState(state domain.AppState) {
a.sourceView.SetCell(1, 0, tview.NewTableCell(state.Source.URL)) a.sourceView.SetCell(1, 0, tview.NewTableCell(state.Source.URL))
if state.Source.Live { if state.Source.Live {
a.sourceView.SetCell(1, 1, tview.NewTableCell("[green]on-air")) a.sourceView.SetCell(1, 1, tview.NewTableCell("[black:green]receiving"))
} else { } else {
a.sourceView.SetCell(1, 1, tview.NewTableCell("[yellow]off-air")) a.sourceView.SetCell(1, 1, tview.NewTableCell("[yellow]off-air"))
} }
@ -162,11 +162,31 @@ func (a *Actor) redrawFromState(state domain.AppState) {
for i, dest := range state.Destinations { for i, dest := range state.Destinations {
a.destView.SetCell(i+1, 0, tview.NewTableCell(dest.URL)) a.destView.SetCell(i+1, 0, tview.NewTableCell(dest.URL))
a.destView.SetCell(i+1, 1, tview.NewTableCell("[yellow]off-air")) if dest.Live {
a.destView.SetCell(i+1, 2, tview.NewTableCell("[white]-")) a.destView.SetCell(i+1, 1, tview.NewTableCell("[black:green]sending"))
a.destView.SetCell(i+1, 3, tview.NewTableCell("[white]-")) } else {
a.destView.SetCell(i+1, 4, tview.NewTableCell("[white]-")) a.destView.SetCell(i+1, 1, tview.NewTableCell("[white]off-air"))
a.destView.SetCell(i+1, 5, tview.NewTableCell("[white]-")) }
a.destView.SetCell(i+1, 2, tview.NewTableCell("[white]"+cmp.Or(dest.Container.State, "-")))
healthState := "-"
if dest.Container.State == "running" {
healthState = "healthy"
}
a.destView.SetCell(i+1, 3, tview.NewTableCell("[white]"+healthState))
cpuPercent := "-"
if dest.Container.State == "running" {
cpuPercent = fmt.Sprintf("%.1f", dest.Container.CPUPercent)
}
memoryUsage := "-"
if dest.Container.State == "running" {
memoryUsage = fmt.Sprintf("%.1f", float64(dest.Container.MemoryUsageBytes)/1024/1024)
}
a.destView.SetCell(i+1, 4, tview.NewTableCell("[white]"+cpuPercent))
a.destView.SetCell(i+1, 5, tview.NewTableCell("[white]"+memoryUsage))
a.destView.SetCell(i+1, 6, tview.NewTableCell("[green]Tab to go live")) a.destView.SetCell(i+1, 6, tview.NewTableCell("[green]Tab to go live"))
} }

39
testhelpers/ffmpeg.go Normal file
View File

@ -0,0 +1,39 @@
package testhelpers
import (
"context"
"os/exec"
"syscall"
"testing"
"github.com/stretchr/testify/require"
)
// StreamFLV streams a test video to the given URL.
func StreamFLV(t *testing.T, destURL string) {
ctx, cancel := context.WithCancel(context.Background())
cmd := exec.CommandContext(
ctx,
"ffmpeg",
"-r", "30",
"-f", "lavfi",
"-i", "testsrc",
"-vf", "scale=1280:960",
"-vcodec", "libx264",
"-x264-params", "keyint=30:scenecut=0", // 1 key frame per second
"-profile:v", "baseline",
"-pix_fmt", "yuv420p",
"-f", "flv",
destURL,
)
require.NoError(t, cmd.Start())
t.Cleanup(func() {
if cmd.Process != nil {
_ = cmd.Process.Signal(syscall.SIGINT)
}
cancel()
})
}