diff --git a/build/mediamtx.Dockerfile b/build/mediamtx.Dockerfile index 0fdd0e4..50193ed 100644 --- a/build/mediamtx.Dockerfile +++ b/build/mediamtx.Dockerfile @@ -7,6 +7,6 @@ RUN apk add --no-cache \ curl COPY --from=mediamtx /mediamtx /usr/bin/mediamtx -COPY --from=mediamtx /mediamtx.yml /mediamtx.yml +COPY build/mediamtx.yml /mediamtx.yml CMD ["/usr/bin/mediamtx"] diff --git a/build/mediamtx.yml b/build/mediamtx.yml new file mode 100644 index 0000000..b55d1b0 --- /dev/null +++ b/build/mediamtx.yml @@ -0,0 +1,705 @@ +############################################### +# Global settings + +# Settings in this section are applied anywhere. + +############################################### +# Global settings -> General + +# Verbosity of the program; available values are "error", "warn", "info", "debug". +logLevel: info +# Destinations of log messages; available values are "stdout", "file" and "syslog". +logDestinations: [stdout] +# If "file" is in logDestinations, this is the file which will receive the logs. +logFile: mediamtx.log + +# Timeout of read operations. +readTimeout: 10s +# Timeout of write operations. +writeTimeout: 10s +# Size of the queue of outgoing packets. +# A higher value allows to increase throughput, a lower value allows to save RAM. +writeQueueSize: 512 +# Maximum size of outgoing UDP packets. +# This can be decreased to avoid fragmentation on networks with a low UDP MTU. +udpMaxPayloadSize: 1472 + +# Command to run when a client connects to the server. +# This is terminated with SIGINT when a client disconnects from the server. +# The following environment variables are available: +# * MTX_CONN_TYPE: connection type +# * MTX_CONN_ID: connection ID +# * RTSP_PORT: RTSP server port +runOnConnect: +# Restart the command if it exits. +runOnConnectRestart: no +# Command to run when a client disconnects from the server. +# Environment variables are the same of runOnConnect. +runOnDisconnect: + +############################################### +# Global settings -> Authentication + +# Authentication method. Available values are: +# * internal: users are stored in the configuration file +# * http: an external HTTP URL is contacted to perform authentication +# * jwt: an external identity server provides authentication through JWTs +authMethod: internal + +# Internal authentication. +# list of users. +authInternalUsers: + # Default unprivileged user. + # Username. 'any' means any user, including anonymous ones. +- user: any + # Password. Not used in case of 'any' user. + pass: + # IPs or networks allowed to use this user. An empty list means any IP. + ips: [] + # List of permissions. + permissions: + # Available actions are: publish, read, playback, api, metrics, pprof. + - action: publish + # Paths can be set to further restrict access to a specific path. + # An empty path means any path. + # Regular expressions can be used by using a tilde as prefix. + path: + - action: read + path: + - action: playback + path: + + # Default administrator. + # This allows to use API, metrics and PPROF without authentication, + # if the IP is localhost. +- user: any + pass: + ips: ['127.0.0.1', '::1', '172.17.0.0/16'] + permissions: + - action: api + - action: metrics + - action: pprof + +# HTTP-based authentication. +# URL called to perform authentication. Every time a user wants +# to authenticate, the server calls this URL with the POST method +# and a body containing: +# { +# "user": "user", +# "password": "password", +# "ip": "ip", +# "action": "publish|read|playback|api|metrics|pprof", +# "path": "path", +# "protocol": "rtsp|rtmp|hls|webrtc|srt", +# "id": "id", +# "query": "query" +# } +# If the response code is 20x, authentication is accepted, otherwise +# it is discarded. +authHTTPAddress: +# Actions to exclude from HTTP-based authentication. +# Format is the same as the one of user permissions. +authHTTPExclude: +- action: api +- action: metrics +- action: pprof + +# JWT-based authentication. +# Users have to login through an external identity server and obtain a JWT. +# This JWT must contain the claim "mediamtx_permissions" with permissions, +# for instance: +# { +# ... +# "mediamtx_permissions": [ +# { +# "action": "publish", +# "path": "somepath" +# } +# ] +# } +# Users are expected to pass the JWT in the Authorization header or as a query parameter. +# This is the JWKS URL that will be used to pull (once) the public key that allows +# to validate JWTs. +authJWTJWKS: +# name of the claim that contains permissions. +authJWTClaimKey: mediamtx_permissions + +############################################### +# Global settings -> Control API + +# Enable controlling the server through the Control API. +api: no +# Address of the Control API listener. +apiAddress: :9997 +# Enable TLS/HTTPS on the Control API server. +apiEncryption: no +# Path to the server key. This is needed only when encryption is yes. +# This can be generated with: +# openssl genrsa -out server.key 2048 +# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 +apiServerKey: server.key +# Path to the server certificate. +apiServerCert: server.crt +# Value of the Access-Control-Allow-Origin header provided in every HTTP response. +apiAllowOrigin: '*' +# List of IPs or CIDRs of proxies placed before the HTTP server. +# If the server receives a request from one of these entries, IP in logs +# will be taken from the X-Forwarded-For header. +apiTrustedProxies: [] + +############################################### +# Global settings -> Metrics + +# Enable Prometheus-compatible metrics. +metrics: no +# Address of the metrics HTTP listener. +metricsAddress: :9998 +# Enable TLS/HTTPS on the Metrics server. +metricsEncryption: no +# Path to the server key. This is needed only when encryption is yes. +# This can be generated with: +# openssl genrsa -out server.key 2048 +# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 +metricsServerKey: server.key +# Path to the server certificate. +metricsServerCert: server.crt +# Value of the Access-Control-Allow-Origin header provided in every HTTP response. +metricsAllowOrigin: '*' +# List of IPs or CIDRs of proxies placed before the HTTP server. +# If the server receives a request from one of these entries, IP in logs +# will be taken from the X-Forwarded-For header. +metricsTrustedProxies: [] + +############################################### +# Global settings -> PPROF + +# Enable pprof-compatible endpoint to monitor performances. +pprof: no +# Address of the pprof listener. +pprofAddress: :9999 +# Enable TLS/HTTPS on the pprof server. +pprofEncryption: no +# Path to the server key. This is needed only when encryption is yes. +# This can be generated with: +# openssl genrsa -out server.key 2048 +# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 +pprofServerKey: server.key +# Path to the server certificate. +pprofServerCert: server.crt +# Value of the Access-Control-Allow-Origin header provided in every HTTP response. +pprofAllowOrigin: '*' +# List of IPs or CIDRs of proxies placed before the HTTP server. +# If the server receives a request from one of these entries, IP in logs +# will be taken from the X-Forwarded-For header. +pprofTrustedProxies: [] + +############################################### +# Global settings -> Playback server + +# Enable downloading recordings from the playback server. +playback: no +# Address of the playback server listener. +playbackAddress: :9996 +# Enable TLS/HTTPS on the playback server. +playbackEncryption: no +# Path to the server key. This is needed only when encryption is yes. +# This can be generated with: +# openssl genrsa -out server.key 2048 +# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 +playbackServerKey: server.key +# Path to the server certificate. +playbackServerCert: server.crt +# Value of the Access-Control-Allow-Origin header provided in every HTTP response. +playbackAllowOrigin: '*' +# List of IPs or CIDRs of proxies placed before the HTTP server. +# If the server receives a request from one of these entries, IP in logs +# will be taken from the X-Forwarded-For header. +playbackTrustedProxies: [] + +############################################### +# Global settings -> RTSP server + +# Enable publishing and reading streams with the RTSP protocol. +rtsp: yes +# List of enabled RTSP transport protocols. +# UDP is the most performant, but doesn't work when there's a NAT/firewall between +# server and clients, and doesn't support encryption. +# UDP-multicast allows to save bandwidth when clients are all in the same LAN. +# TCP is the most versatile, and does support encryption. +# The handshake is always performed with TCP. +rtspTransports: [udp, multicast, tcp] +# Encrypt handshakes and TCP streams with TLS (RTSPS). +# Available values are "no", "strict", "optional". +rtspEncryption: "no" +# Address of the TCP/RTSP listener. This is needed only when encryption is "no" or "optional". +rtspAddress: :8554 +# Address of the TCP/TLS/RTSPS listener. This is needed only when encryption is "strict" or "optional". +rtspsAddress: :8322 +# Address of the UDP/RTP listener. This is needed only when "udp" is in protocols. +rtpAddress: :8000 +# Address of the UDP/RTCP listener. This is needed only when "udp" is in protocols. +rtcpAddress: :8001 +# IP range of all UDP-multicast listeners. This is needed only when "multicast" is in protocols. +multicastIPRange: 224.1.0.0/16 +# Port of all UDP-multicast/RTP listeners. This is needed only when "multicast" is in protocols. +multicastRTPPort: 8002 +# Port of all UDP-multicast/RTCP listeners. This is needed only when "multicast" is in protocols. +multicastRTCPPort: 8003 +# Path to the server key. This is needed only when encryption is "strict" or "optional". +# This can be generated with: +# openssl genrsa -out server.key 2048 +# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 +rtspServerKey: server.key +# Path to the server certificate. This is needed only when encryption is "strict" or "optional". +rtspServerCert: server.crt +# Authentication methods. Available are "basic" and "digest". +# "digest" doesn't provide any additional security and is available for compatibility only. +rtspAuthMethods: [basic] + +############################################### +# Global settings -> RTMP server + +# Enable publishing and reading streams with the RTMP protocol. +rtmp: yes +# Address of the RTMP listener. This is needed only when encryption is "no" or "optional". +rtmpAddress: :1935 +# Encrypt connections with TLS (RTMPS). +# Available values are "no", "strict", "optional". +rtmpEncryption: "no" +# Address of the RTMPS listener. This is needed only when encryption is "strict" or "optional". +rtmpsAddress: :1936 +# Path to the server key. This is needed only when encryption is "strict" or "optional". +# This can be generated with: +# openssl genrsa -out server.key 2048 +# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 +rtmpServerKey: server.key +# Path to the server certificate. This is needed only when encryption is "strict" or "optional". +rtmpServerCert: server.crt + +############################################### +# Global settings -> HLS server + +# Enable reading streams with the HLS protocol. +hls: yes +# Address of the HLS listener. +hlsAddress: :8888 +# Enable TLS/HTTPS on the HLS server. +# This is required for Low-Latency HLS. +hlsEncryption: no +# Path to the server key. This is needed only when encryption is yes. +# This can be generated with: +# openssl genrsa -out server.key 2048 +# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 +hlsServerKey: server.key +# Path to the server certificate. +hlsServerCert: server.crt +# Value of the Access-Control-Allow-Origin header provided in every HTTP response. +# This allows to play the HLS stream from an external website. +hlsAllowOrigin: '*' +# List of IPs or CIDRs of proxies placed before the HLS server. +# If the server receives a request from one of these entries, IP in logs +# will be taken from the X-Forwarded-For header. +hlsTrustedProxies: [] +# By default, HLS is generated only when requested by a user. +# This option allows to generate it always, avoiding the delay between request and generation. +hlsAlwaysRemux: no +# Variant of the HLS protocol to use. Available options are: +# * mpegts - uses MPEG-TS segments, for maximum compatibility. +# * fmp4 - uses fragmented MP4 segments, more efficient. +# * lowLatency - uses Low-Latency HLS. +hlsVariant: lowLatency +# Number of HLS segments to keep on the server. +# Segments allow to seek through the stream. +# Their number doesn't influence latency. +hlsSegmentCount: 7 +# Minimum duration of each segment. +# A player usually puts 3 segments in a buffer before reproducing the stream. +# The final segment duration is also influenced by the interval between IDR frames, +# since the server changes the duration in order to include at least one IDR frame +# in each segment. +hlsSegmentDuration: 1s +# Minimum duration of each part. +# A player usually puts 3 parts in a buffer before reproducing the stream. +# Parts are used in Low-Latency HLS in place of segments. +# Part duration is influenced by the distance between video/audio samples +# and is adjusted in order to produce segments with a similar duration. +hlsPartDuration: 200ms +# Maximum size of each segment. +# This prevents RAM exhaustion. +hlsSegmentMaxSize: 50M +# Directory in which to save segments, instead of keeping them in the RAM. +# This decreases performance, since reading from disk is less performant than +# reading from RAM, but allows to save RAM. +hlsDirectory: '' +# The muxer will be closed when there are no +# reader requests and this amount of time has passed. +hlsMuxerCloseAfter: 60s + +############################################### +# Global settings -> WebRTC server + +# Enable publishing and reading streams with the WebRTC protocol. +webrtc: yes +# Address of the WebRTC HTTP listener. +webrtcAddress: :8889 +# Enable TLS/HTTPS on the WebRTC server. +webrtcEncryption: no +# Path to the server key. +# This can be generated with: +# openssl genrsa -out server.key 2048 +# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 +webrtcServerKey: server.key +# Path to the server certificate. +webrtcServerCert: server.crt +# Value of the Access-Control-Allow-Origin header provided in every HTTP response. +# This allows to play the WebRTC stream from an external website. +webrtcAllowOrigin: '*' +# List of IPs or CIDRs of proxies placed before the WebRTC server. +# If the server receives a request from one of these entries, IP in logs +# will be taken from the X-Forwarded-For header. +webrtcTrustedProxies: [] +# Address of a local UDP listener that will receive connections. +# Use a blank string to disable. +webrtcLocalUDPAddress: :8189 +# Address of a local TCP listener that will receive connections. +# This is disabled by default since TCP is less efficient than UDP and +# introduces a progressive delay when network is congested. +webrtcLocalTCPAddress: '' +# WebRTC clients need to know the IP of the server. +# Gather IPs from interfaces and send them to clients. +webrtcIPsFromInterfaces: yes +# List of interfaces whose IPs will be sent to clients. +# An empty value means to use all available interfaces. +webrtcIPsFromInterfacesList: [] +# List of additional hosts or IPs to send to clients. +webrtcAdditionalHosts: [] +# ICE servers. Needed only when local listeners can't be reached by clients. +# STUN servers allows to obtain and share the public IP of the server. +# TURN/TURNS servers forces all traffic through them. +webrtcICEServers2: [] + # - url: stun:stun.l.google.com:19302 + # if user is "AUTH_SECRET", then authentication is secret based. + # the secret must be inserted into the password field. + # username: '' + # password: '' + # clientOnly: false +# Time to wait for the WebRTC handshake to complete. +webrtcHandshakeTimeout: 10s +# Maximum time to gather video tracks. +webrtcTrackGatherTimeout: 2s + +############################################### +# Global settings -> SRT server + +# Enable publishing and reading streams with the SRT protocol. +srt: yes +# Address of the SRT listener. +srtAddress: :8890 + +############################################### +# Default path settings + +# Settings in "pathDefaults" are applied anywhere, +# unless they are overridden in "paths". +pathDefaults: + + ############################################### + # Default path settings -> General + + # Source of the stream. This can be: + # * publisher -> the stream is provided by a RTSP, RTMP, WebRTC or SRT client + # * rtsp://existing-url -> the stream is pulled from another RTSP server / camera + # * rtsps://existing-url -> the stream is pulled from another RTSP server / camera with RTSPS + # * rtmp://existing-url -> the stream is pulled from another RTMP server / camera + # * rtmps://existing-url -> the stream is pulled from another RTMP server / camera with RTMPS + # * http://existing-url/stream.m3u8 -> the stream is pulled from another HLS server / camera + # * https://existing-url/stream.m3u8 -> the stream is pulled from another HLS server / camera with HTTPS + # * udp://ip:port -> the stream is pulled with UDP, by listening on the specified IP and port + # * srt://existing-url -> the stream is pulled from another SRT server / camera + # * whep://existing-url -> the stream is pulled from another WebRTC server / camera + # * wheps://existing-url -> the stream is pulled from another WebRTC server / camera with HTTPS + # * redirect -> the stream is provided by another path or server + # * rpiCamera -> the stream is provided by a Raspberry Pi Camera + # The following variables can be used in the source string: + # * $MTX_QUERY: query parameters (passed by first reader) + # * $G1, $G2, ...: regular expression groups, if path name is + # a regular expression. + source: publisher + # If the source is a URL, and the source certificate is self-signed + # or invalid, you can provide the fingerprint of the certificate in order to + # validate it anyway. It can be obtained by running: + # openssl s_client -connect source_ip:source_port /dev/null | sed -n '/BEGIN/,/END/p' > server.crt + # openssl x509 -in server.crt -noout -fingerprint -sha256 | cut -d "=" -f2 | tr -d ':' + sourceFingerprint: + # If the source is a URL, it will be pulled only when at least + # one reader is connected, saving bandwidth. + sourceOnDemand: no + # If sourceOnDemand is "yes", readers will be put on hold until the source is + # ready or until this amount of time has passed. + sourceOnDemandStartTimeout: 10s + # If sourceOnDemand is "yes", the source will be closed when there are no + # readers connected and this amount of time has passed. + sourceOnDemandCloseAfter: 10s + # Maximum number of readers. Zero means no limit. + maxReaders: 0 + # SRT encryption passphrase require to read from this path + srtReadPassphrase: + # If the stream is not available, redirect readers to this path. + # It can be can be a relative path (i.e. /otherstream) or an absolute RTSP URL. + fallback: + + ############################################### + # Default path settings -> Record + + # Record streams to disk. + record: no + # Path of recording segments. + # Extension is added automatically. + # Available variables are %path (path name), %Y %m %d %H %M %S %f %s (time in strftime format) + recordPath: ./recordings/%path/%Y-%m-%d_%H-%M-%S-%f + # Format of recorded segments. + # Available formats are "fmp4" (fragmented MP4) and "mpegts" (MPEG-TS). + recordFormat: fmp4 + # fMP4 segments are concatenation of small MP4 files (parts), each with this duration. + # MPEG-TS segments are concatenation of 188-bytes packets, flushed to disk with this period. + # When a system failure occurs, the last part gets lost. + # Therefore, the part duration is equal to the RPO (recovery point objective). + recordPartDuration: 1s + # Minimum duration of each segment. + recordSegmentDuration: 1h + # Delete segments after this timespan. + # Set to 0s to disable automatic deletion. + recordDeleteAfter: 24h + + ############################################### + # Default path settings -> Publisher source (when source is "publisher") + + # Allow another client to disconnect the current publisher and publish in its place. + overridePublisher: yes + # SRT encryption passphrase required to publish to this path + srtPublishPassphrase: + + ############################################### + # Default path settings -> RTSP source (when source is a RTSP or a RTSPS URL) + + # Transport protocol used to pull the stream. available values are "automatic", "udp", "multicast", "tcp". + rtspTransport: automatic + # Support sources that don't provide server ports or use random server ports. This is a security issue + # and must be used only when interacting with sources that require it. + rtspAnyPort: no + # Range header to send to the source, in order to start streaming from the specified offset. + # available values: + # * clock: Absolute time + # * npt: Normal Play Time + # * smpte: SMPTE timestamps relative to the start of the recording + rtspRangeType: + # Available values: + # * clock: UTC ISO 8601 combined date and time string, e.g. 20230812T120000Z + # * npt: duration such as "300ms", "1.5m" or "2h45m", valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h" + # * smpte: duration such as "300ms", "1.5m" or "2h45m", valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h" + rtspRangeStart: + + ############################################### + # Default path settings -> Redirect source (when source is "redirect") + + # RTSP URL which clients will be redirected to. + sourceRedirect: + + ############################################### + # Default path settings -> Raspberry Pi Camera source (when source is "rpiCamera") + + # ID of the camera + rpiCameraCamID: 0 + # Width of frames + rpiCameraWidth: 1920 + # Height of frames + rpiCameraHeight: 1080 + # Flip horizontally + rpiCameraHFlip: false + # Flip vertically + rpiCameraVFlip: false + # Brightness [-1, 1] + rpiCameraBrightness: 0 + # Contrast [0, 16] + rpiCameraContrast: 1 + # Saturation [0, 16] + rpiCameraSaturation: 1 + # Sharpness [0, 16] + rpiCameraSharpness: 1 + # Exposure mode. + # values: normal, short, long, custom + rpiCameraExposure: normal + # Auto-white-balance mode. + # values: auto, incandescent, tungsten, fluorescent, indoor, daylight, cloudy, custom + rpiCameraAWB: auto + # Auto-white-balance fixed gains. This can be used in place of rpiCameraAWB. + # format: [red,blue] + rpiCameraAWBGains: [0, 0] + # Denoise operating mode. + # values: off, cdn_off, cdn_fast, cdn_hq + rpiCameraDenoise: "off" + # Fixed shutter speed, in microseconds. + rpiCameraShutter: 0 + # Metering mode of the AEC/AGC algorithm. + # values: centre, spot, matrix, custom + rpiCameraMetering: centre + # Fixed gain + rpiCameraGain: 0 + # EV compensation of the image [-10, 10] + rpiCameraEV: 0 + # Region of interest, in format x,y,width,height (all normalized between 0 and 1) + rpiCameraROI: + # Whether to enable HDR on Raspberry Camera 3. + rpiCameraHDR: false + # Tuning file + rpiCameraTuningFile: + # Sensor mode, in format [width]:[height]:[bit-depth]:[packing] + # bit-depth and packing are optional. + rpiCameraMode: + # frames per second + rpiCameraFPS: 30 + # Autofocus mode + # values: auto, manual, continuous + rpiCameraAfMode: continuous + # Autofocus range + # values: normal, macro, full + rpiCameraAfRange: normal + # Autofocus speed + # values: normal, fast + rpiCameraAfSpeed: normal + # Lens position (for manual autofocus only), will be set to focus to a specific distance + # calculated by the following formula: d = 1 / value + # Examples: 0 moves the lens to infinity. + # 0.5 moves the lens to focus on objects 2m away. + # 2 moves the lens to focus on objects 50cm away. + rpiCameraLensPosition: 0.0 + # Specifies the autofocus window, in the form x,y,width,height where the coordinates + # are given as a proportion of the entire image. + rpiCameraAfWindow: + # Manual flicker correction period, in microseconds. + rpiCameraFlickerPeriod: 0 + # Enables printing text on each frame. + rpiCameraTextOverlayEnable: false + # Text that is printed on each frame. + # format is the one of the strftime() function. + rpiCameraTextOverlay: '%Y-%m-%d %H:%M:%S - MediaMTX' + # Codec. Available values: auto, hardwareH264, softwareH264 + rpiCameraCodec: auto + # Period between IDR frames + rpiCameraIDRPeriod: 60 + # Bitrate + rpiCameraBitrate: 5000000 + # H264 profile + rpiCameraProfile: main + # H264 level + rpiCameraLevel: '4.1' + + ############################################### + # Default path settings -> Hooks + + # Command to run when this path is initialized. + # This can be used to publish a stream when the server is launched. + # This is terminated with SIGINT when the program closes. + # The following environment variables are available: + # * MTX_PATH: path name + # * RTSP_PORT: RTSP server port + # * G1, G2, ...: regular expression groups, if path name is + # a regular expression. + runOnInit: + # Restart the command if it exits. + runOnInitRestart: no + + # Command to run when this path is requested by a reader + # and no one is publishing to this path yet. + # This can be used to publish a stream on demand. + # This is terminated with SIGINT when there are no readers anymore. + # The following environment variables are available: + # * MTX_PATH: path name + # * MTX_QUERY: query parameters (passed by first reader) + # * RTSP_PORT: RTSP server port + # * G1, G2, ...: regular expression groups, if path name is + # a regular expression. + runOnDemand: + # Restart the command if it exits. + runOnDemandRestart: no + # Readers will be put on hold until the runOnDemand command starts publishing + # or until this amount of time has passed. + runOnDemandStartTimeout: 10s + # The command will be closed when there are no + # readers connected and this amount of time has passed. + runOnDemandCloseAfter: 10s + # Command to run when there are no readers anymore. + # Environment variables are the same of runOnDemand. + runOnUnDemand: + + # Command to run when the stream is ready to be read, whenever it is + # published by a client or pulled from a server / camera. + # This is terminated with SIGINT when the stream is not ready anymore. + # The following environment variables are available: + # * MTX_PATH: path name + # * MTX_QUERY: query parameters (passed by publisher) + # * MTX_SOURCE_TYPE: source type + # * MTX_SOURCE_ID: source ID + # * RTSP_PORT: RTSP server port + # * G1, G2, ...: regular expression groups, if path name is + # a regular expression. + runOnReady: + # Restart the command if it exits. + runOnReadyRestart: no + # Command to run when the stream is not available anymore. + # Environment variables are the same of runOnReady. + runOnNotReady: + + # Command to run when a client starts reading. + # This is terminated with SIGINT when a client stops reading. + # The following environment variables are available: + # * MTX_PATH: path name + # * MTX_QUERY: query parameters (passed by reader) + # * MTX_READER_TYPE: reader type + # * MTX_READER_ID: reader ID + # * RTSP_PORT: RTSP server port + # * G1, G2, ...: regular expression groups, if path name is + # a regular expression. + runOnRead: + # Restart the command if it exits. + runOnReadRestart: no + # Command to run when a client stops reading. + # Environment variables are the same of runOnRead. + runOnUnread: + + # Command to run when a recording segment is created. + # The following environment variables are available: + # * MTX_PATH: path name + # * MTX_SEGMENT_PATH: segment file path + # * RTSP_PORT: RTSP server port + # * G1, G2, ...: regular expression groups, if path name is + # a regular expression. + runOnRecordSegmentCreate: + + # Command to run when a recording segment is complete. + # The following environment variables are available: + # * MTX_PATH: path name + # * MTX_SEGMENT_PATH: segment file path + # * MTX_SEGMENT_DURATION: segment duration + # * RTSP_PORT: RTSP server port + # * G1, G2, ...: regular expression groups, if path name is + # a regular expression. + runOnRecordSegmentComplete: + +############################################### +# Path settings + +# Settings in "paths" are applied to specific paths, and the map key +# is the name of the path. +# Any setting in "pathDefaults" can be overridden here. +# It's possible to use regular expressions by using a tilde as prefix, +# for example "~^(test1|test2)$" will match both "test1" and "test2", +# for example "~^prefix" will match all paths that start with "prefix". +paths: + # example: + # my_camera: + # source: rtsp://my_camera + + # Settings under path "all_others" are applied to all paths that + # do not match another entry. + all_others: diff --git a/container/container.go b/container/container.go index ffd10ff..0c78050 100644 --- a/container/container.go +++ b/container/container.go @@ -175,9 +175,8 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (< now := time.Now() containerStateC := make(chan domain.Container, cmp.Or(params.ChanSize, defaultChanSize)) errC := make(chan error, 1) - closeWithError := func(err error) { + sendError := func(err error) { errC <- err - close(errC) } a.wg.Add(1) @@ -189,7 +188,7 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (< pullReader, err := a.apiClient.ImagePull(ctx, params.ContainerConfig.Image, image.PullOptions{}) if err != nil { - closeWithError(fmt.Errorf("image pull: %w", err)) + sendError(fmt.Errorf("image pull: %w", err)) return } _, _ = io.Copy(io.Discard, pullReader) @@ -212,16 +211,17 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (< name, ) if err != nil { - closeWithError(fmt.Errorf("container create: %w", err)) + sendError(fmt.Errorf("container create: %w", err)) return } containerStateC <- domain.Container{ID: createResp.ID, State: "created"} if err = a.apiClient.ContainerStart(ctx, createResp.ID, container.StartOptions{}); err != nil { - closeWithError(fmt.Errorf("container start: %w", err)) + sendError(fmt.Errorf("container start: %w", err)) return } a.logger.Info("Started container", "id", shortID(createResp.ID), "duration", time.Since(now)) + containerStateC <- domain.Container{ID: createResp.ID, State: "running"} a.runContainerLoop(ctx, createResp.ID, containerStateC, errC) @@ -245,6 +245,8 @@ func (a *Client) runContainerLoop(ctx context.Context, containerID string, state select { case resp := <-containerRespC: a.logger.Info("Container entered non-running state", "exit_code", resp.StatusCode, "id", shortID(containerID)) + state.State = "exited" + sendState() return case err := <-containerErrC: // TODO: error handling? diff --git a/container/container_test.go b/container/container_test.go index 7399aa9..050c288 100644 --- a/container/container_test.go +++ b/container/container_test.go @@ -113,7 +113,7 @@ func TestClientRemoveContainers(t *testing.T) { return running }, 5*time.Second, - 100*time.Millisecond, + 500*time.Millisecond, "container group 1 not in RUNNING state", ) // check all containers in group 2 are running @@ -124,7 +124,7 @@ func TestClientRemoveContainers(t *testing.T) { return running }, 2*time.Second, - 100*time.Millisecond, + 500*time.Millisecond, "container group 2 not in RUNNING state", ) @@ -141,7 +141,7 @@ func TestClientRemoveContainers(t *testing.T) { return err == nil && !running }, 2*time.Second, - 100*time.Millisecond, + 500*time.Millisecond, "container group 1 still in RUNNING state", ) diff --git a/domain/types.go b/domain/types.go index ece5eb7..f0aa815 100644 --- a/domain/types.go +++ b/domain/types.go @@ -10,6 +10,7 @@ type AppState struct { type Source struct { Container Container Live bool + Listeners int URL string } diff --git a/go.mod b/go.mod index 63de494..92c4d23 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.23.5 require ( github.com/docker/docker v27.5.0+incompatible + github.com/docker/go-connections v0.5.0 github.com/gdamore/tcell/v2 v2.7.1 github.com/google/uuid v1.6.0 github.com/rivo/tview v0.0.0-20241227133733-17b7edb88c57 @@ -16,7 +17,6 @@ require ( github.com/containerd/log v0.1.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/distribution/reference v0.6.0 // indirect - github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gdamore/encoding v1.0.0 // indirect diff --git a/main.go b/main.go index 6145ebf..f4ea960 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "git.netflux.io/rob/termstream/container" "git.netflux.io/rob/termstream/domain" "git.netflux.io/rob/termstream/mediaserver" + "git.netflux.io/rob/termstream/multiplexer" "git.netflux.io/rob/termstream/terminal" ) @@ -42,6 +43,15 @@ func run(ctx context.Context, cfgReader io.Reader) error { logger := slog.New(slog.NewTextHandler(logFile, nil)) logger.Info("Starting termstream", slog.Any("initial_state", state)) + ui, err := terminal.StartActor(ctx, terminal.StartActorParams{Logger: logger.With("component", "ui")}) + if err != nil { + return fmt.Errorf("start tui: %w", err) + } + defer ui.Close() + + updateUI := func() { ui.SetState(*state) } + updateUI() + containerClient, err := container.NewClient(ctx, logger.With("component", "container_client")) if err != nil { return fmt.Errorf("new container client: %w", err) @@ -52,15 +62,14 @@ func run(ctx context.Context, cfgReader io.Reader) error { ContainerClient: containerClient, Logger: logger.With("component", "mediaserver"), }) + defer srv.Close() - ui, err := terminal.StartActor(ctx, terminal.StartActorParams{Logger: logger.With("component", "ui")}) - if err != nil { - return fmt.Errorf("start tui: %w", err) - } - defer ui.Close() - - updateUI := func() { ui.SetState(*state) } - updateUI() + mp := multiplexer.NewActor(ctx, multiplexer.NewActorParams{ + SourceURL: srv.State().URL, + ContainerClient: containerClient, + Logger: logger.With("component", "multiplexer"), + }) + defer mp.Close() uiTicker := time.NewTicker(uiUpdateInterval) defer uiTicker.Stop() @@ -73,13 +82,21 @@ func run(ctx context.Context, cfgReader io.Reader) error { logger.Info("UI closed") return nil } + logger.Info("Command received", "cmd", cmd) + switch c := cmd.(type) { + case terminal.CommandToggleDestination: + mp.ToggleDestination(c.URL) + } case <-uiTicker.C: // TODO: update UI with current state? updateUI() case serverState := <-srv.C(): applyServerState(serverState, state) updateUI() + case mpState := <-mp.C(): + applyMultiplexerState(mpState, state) + updateUI() } } } @@ -89,6 +106,18 @@ func applyServerState(serverState domain.Source, appState *domain.AppState) { appState.Source = serverState } +func applyMultiplexerState(destination domain.Destination, appState *domain.AppState) { + for i, dest := range appState.Destinations { + if dest.URL != destination.URL { + continue + } + + appState.Destinations[i] = destination + + break + } +} + // applyConfig applies the configuration to the app state. func applyConfig(cfg config.Config, appState *domain.AppState) { appState.Destinations = make([]domain.Destination, 0, len(cfg.Destinations)) diff --git a/mediaserver/actor.go b/mediaserver/actor.go index a1311b7..91821d0 100644 --- a/mediaserver/actor.go +++ b/mediaserver/actor.go @@ -3,22 +3,28 @@ package mediaserver import ( "cmp" "context" - "encoding/json" "fmt" - "io" "log/slog" "net/http" + "strconv" "time" typescontainer "github.com/docker/docker/api/types/container" + "github.com/docker/go-connections/nat" "git.netflux.io/rob/termstream/container" "git.netflux.io/rob/termstream/domain" ) const ( - imageNameMediaMTX = "netfluxio/mediamtx-alpine:latest" - rtmpPath = "live" + defaultFetchIngressStateInterval = 5 * time.Second // default interval to fetch the state of the media server + defaultAPIPort = 9997 // default API port for the media server + defaultRTMPPort = 1935 // default RTMP port for the media server + defaultChanSize = 64 // default channel size for asynchronous non-error channels + imageNameMediaMTX = "netfluxio/mediamtx-alpine:latest" // image name for mediamtx + rtmpPath = "live" // RTMP path for the media server + componentName = "mediaserver" // component name, mostly used for Docker labels + httpClientTimeout = time.Second // timeout for outgoing HTTP client requests ) // action is an action to be performed by the actor. @@ -26,11 +32,14 @@ type action func() // Actor is responsible for managing the media server. type Actor struct { - actorC chan action - stateC chan domain.Source - containerClient *container.Client - logger *slog.Logger - httpClient *http.Client + actorC chan action + stateC chan domain.Source + containerClient *container.Client + apiPort int + rtmpPort int + fetchIngressStateInterval time.Duration + logger *slog.Logger + httpClient *http.Client // mutable state state *domain.Source @@ -39,17 +48,14 @@ type Actor struct { // StartActorParams contains the parameters for starting a new media server // actor. type StartActorParams struct { - ContainerClient *container.Client - ChanSize int - Logger *slog.Logger + APIPort int // defaults to 9997 + RTMPPort int // defaults to 1935 + ChanSize int // defaults to 64 + FetchIngressStateInterval time.Duration // defaults to 5 seconds + ContainerClient *container.Client + Logger *slog.Logger } -const ( - defaultChanSize = 64 - componentName = "mediaserver" - httpClientTimeout = time.Second -) - // StartActor starts a new media server actor. // // Callers must consume the state channel exposed via [C]. @@ -57,18 +63,25 @@ func StartActor(ctx context.Context, params StartActorParams) *Actor { chanSize := cmp.Or(params.ChanSize, defaultChanSize) actor := &Actor{ - actorC: make(chan action, chanSize), - state: new(domain.Source), - stateC: make(chan domain.Source, chanSize), - containerClient: params.ContainerClient, - logger: params.Logger, - httpClient: &http.Client{Timeout: httpClientTimeout}, + apiPort: cmp.Or(params.APIPort, defaultAPIPort), + rtmpPort: cmp.Or(params.RTMPPort, defaultRTMPPort), + fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval), + actorC: make(chan action, chanSize), + state: new(domain.Source), + stateC: make(chan domain.Source, chanSize), + containerClient: params.ContainerClient, + logger: params.Logger, + httpClient: &http.Client{Timeout: httpClientTimeout}, } + apiPortSpec := nat.Port(strconv.Itoa(actor.apiPort) + ":9997") + rtmpPortSpec := nat.Port(strconv.Itoa(actor.rtmpPort) + ":1935") + exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)}) + containerStateC, errC := params.ContainerClient.RunContainer( ctx, container.RunContainerParams{ - Name: "server", + Name: componentName, ChanSize: chanSize, ContainerConfig: &typescontainer.Config{ Image: imageNameMediaMTX, @@ -86,14 +99,16 @@ func StartActor(ctx context.Context, params StartActorParams) *Actor { StartInterval: time.Second * 2, Retries: 2, }, + ExposedPorts: exposedPorts, }, HostConfig: &typescontainer.HostConfig{ - NetworkMode: "host", + NetworkMode: "default", + PortBindings: portBindings, }, }, ) - actor.state.URL = "rtmp://localhost:1935/" + rtmpPath + actor.state.URL = actor.rtmpURL() go actor.actorLoop(containerStateC, errC) @@ -128,8 +143,8 @@ func (s *Actor) Close() error { // actorLoop is the main loop of the media server actor. It only exits when the // actor is closed. func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan error) { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() + fetchStateT := time.NewTicker(s.fetchIngressStateInterval) + defer fetchStateT.Stop() sendState := func() { s.stateC <- *s.state } @@ -145,20 +160,21 @@ func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan e s.logger.Error("Error from container client", "error", err, "id", shortID(s.state.Container.ID)) } - ticker.Stop() + fetchStateT.Stop() if s.state.Live { s.state.Live = false sendState() } - case <-ticker.C: - ingressLive, err := s.fetchIngressStateFromServer() + case <-fetchStateT.C: + ingressState, err := s.fetchIngressState() if err != nil { s.logger.Error("Error fetching server state", "error", err) continue } - if ingressLive != s.state.Live { - s.state.Live = ingressLive + if ingressState.ready != s.state.Live || ingressState.listeners != s.state.Listeners { + s.state.Live = ingressState.ready + s.state.Listeners = ingressState.listeners sendState() } case action, ok := <-s.actorC: @@ -170,54 +186,17 @@ func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan e } } -type apiResponse[T any] struct { - Items []T `json:"items"` +// rtmpURL returns the RTMP URL for the media server, accessible from the host. +func (s *Actor) rtmpURL() string { + return fmt.Sprintf("rtmp://localhost:%d/%s", s.rtmpPort, rtmpPath) } -type rtmpConnsResponse struct { - ID string `json:"id"` - CreatedAt time.Time `json:"created"` - State string `json:"state"` - Path string `json:"path"` - BytesReceived int64 `json:"bytesReceived"` - BytesSent int64 `json:"bytesSent"` - RemoteAddr string `json:"remoteAddr"` -} - -func (s *Actor) fetchIngressStateFromServer() (bool, error) { - req, err := http.NewRequest(http.MethodGet, "http://localhost:9997/v3/rtmpconns/list", nil) - if err != nil { - return false, fmt.Errorf("new request: %w", err) - } - - httpResp, err := s.httpClient.Do(req) - if err != nil { - return false, fmt.Errorf("do request: %w", err) - } - - if httpResp.StatusCode != http.StatusOK { - return false, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode) - } - - respBody, err := io.ReadAll(httpResp.Body) - if err != nil { - return false, fmt.Errorf("read body: %w", err) - } - - var resp apiResponse[rtmpConnsResponse] - if err = json.Unmarshal(respBody, &resp); err != nil { - return false, fmt.Errorf("unmarshal: %w", err) - } - - for _, conn := range resp.Items { - if conn.Path == rtmpPath && conn.State == "publish" { - return true, nil - } - } - - return false, nil +// apiURL returns the API URL for the media server, accessible from the host. +func (s *Actor) apiURL() string { + return fmt.Sprintf("http://localhost:%d/v3/rtmpconns/list", s.apiPort) } +// shortID returns the first 12 characters of the given container ID. func shortID(id string) string { if len(id) < 12 { return id diff --git a/mediaserver/actor_test.go b/mediaserver/actor_test.go index f268979..859855c 100644 --- a/mediaserver/actor_test.go +++ b/mediaserver/actor_test.go @@ -2,8 +2,6 @@ package mediaserver_test import ( "context" - "os/exec" - "syscall" "testing" "time" @@ -30,9 +28,10 @@ func TestMediaServerStartStop(t *testing.T) { assert.False(t, running) mediaServer := mediaserver.StartActor(ctx, mediaserver.StartActorParams{ - ChanSize: 1, - ContainerClient: containerClient, - Logger: logger, + FetchIngressStateInterval: 500 * time.Millisecond, + ChanSize: 1, + ContainerClient: containerClient, + Logger: logger, }) require.NoError(t, err) testhelpers.ChanDiscard(mediaServer.C()) @@ -43,8 +42,8 @@ func TestMediaServerStartStop(t *testing.T) { running, err = containerClient.ContainerRunning(ctx, map[string]string{"component": component}) return err == nil && running }, - 5*time.Second, - 250*time.Millisecond, + time.Second*10, + time.Second, "container not in RUNNING state", ) @@ -52,15 +51,16 @@ func TestMediaServerStartStop(t *testing.T) { assert.False(t, state.Live) assert.Equal(t, "rtmp://localhost:1935/live", state.URL) - launchFFMPEG(t, "rtmp://localhost:1935/live") + testhelpers.StreamFLV(t, "rtmp://localhost:1935/live") + require.Eventually( t, func() bool { currState := mediaServer.State() return currState.Live && currState.Container.HealthState == "healthy" }, - 5*time.Second, - 250*time.Millisecond, + time.Second*5, + time.Second, "actor not healthy and/or in LIVE state", ) @@ -70,33 +70,3 @@ func TestMediaServerStartStop(t *testing.T) { require.NoError(t, err) assert.False(t, running) } - -func launchFFMPEG(t *testing.T, destURL string) *exec.Cmd { - ctx, cancel := context.WithCancel(context.Background()) - - cmd := exec.CommandContext( - ctx, - "ffmpeg", - "-r", "30", - "-f", "lavfi", - "-i", "testsrc", - "-vf", "scale=1280:960", - "-vcodec", "libx264", - "-profile:v", "baseline", - "-pix_fmt", "yuv420p", - "-f", "flv", - destURL, - ) - - require.NoError(t, cmd.Start()) - - t.Cleanup(func() { - if cmd.Process != nil { - _ = cmd.Process.Signal(syscall.SIGINT) - } - - cancel() - }) - - return cmd -} diff --git a/mediaserver/api.go b/mediaserver/api.go new file mode 100644 index 0000000..b71bca0 --- /dev/null +++ b/mediaserver/api.go @@ -0,0 +1,72 @@ +package mediaserver + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +type apiResponse[T any] struct { + Items []T `json:"items"` +} + +type rtmpConnsResponse struct { + ID string `json:"id"` + CreatedAt time.Time `json:"created"` + State string `json:"state"` + Path string `json:"path"` + BytesReceived int64 `json:"bytesReceived"` + BytesSent int64 `json:"bytesSent"` + RemoteAddr string `json:"remoteAddr"` +} + +type ingressStreamState struct { + ready bool + listeners int +} + +func (s *Actor) fetchIngressState() (state ingressStreamState, _ error) { + req, err := http.NewRequest(http.MethodGet, s.apiURL(), nil) + if err != nil { + return state, fmt.Errorf("new request: %w", err) + } + + httpResp, err := s.httpClient.Do(req) + if err != nil { + return state, fmt.Errorf("do request: %w", err) + } + + if httpResp.StatusCode != http.StatusOK { + return state, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode) + } + + respBody, err := io.ReadAll(httpResp.Body) + if err != nil { + return state, fmt.Errorf("read body: %w", err) + } + + var resp apiResponse[rtmpConnsResponse] + if err = json.Unmarshal(respBody, &resp); err != nil { + return state, fmt.Errorf("unmarshal: %w", err) + } + + for _, conn := range resp.Items { + if conn.Path != rtmpPath { + continue + } + + switch conn.State { + case "publish": + // mediamtx may report a stream as being in publish state via the API, + // but still refuse to serve them due to being unpublished. This seems to + // be a bug, this is a hacky workaround. + state.ready = conn.BytesReceived > 20_000 + case "read": + state.listeners++ + } + } + + return state, nil +} diff --git a/mise/tasks/push_mediamtx_image b/mise/tasks/push_mediamtx_image new file mode 100755 index 0000000..9f72d71 --- /dev/null +++ b/mise/tasks/push_mediamtx_image @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +#MISE description="Build and push mediamtx image" + +set -euo pipefail + +docker build -f build/mediamtx.Dockerfile -t netfluxio/mediamtx-alpine:latest . +docker push netfluxio/mediamtx-alpine:latest diff --git a/multiplexer/multiplexer.go b/multiplexer/multiplexer.go new file mode 100644 index 0000000..be413f7 --- /dev/null +++ b/multiplexer/multiplexer.go @@ -0,0 +1,170 @@ +package multiplexer + +import ( + "cmp" + "context" + "fmt" + "log/slog" + "strconv" + "sync" + + typescontainer "github.com/docker/docker/api/types/container" + + "git.netflux.io/rob/termstream/container" + "git.netflux.io/rob/termstream/domain" +) + +type action func() + +const ( + defaultChanSize = 64 // default channel size for asynchronous non-error channels + componentName = "multiplexer" // component name, mostly used for Docker labels + imageNameFFMPEG = "ghcr.io/jrottenberg/ffmpeg:7.1-scratch" // image name for ffmpeg +) + +// Actor is responsible for managing the multiplexer. +type Actor struct { + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + sourceURL string + containerClient *container.Client + logger *slog.Logger + actorC chan action + stateC chan domain.Destination + + // mutable state + currURLs map[string]struct{} + nextIndex int +} + +// NewActorParams contains the parameters for starting a new multiplexer actor. +type NewActorParams struct { + SourceURL string + ChanSize int + ContainerClient *container.Client + Logger *slog.Logger +} + +// NewActor starts a new multiplexer actor. +// +// The channel exposed by [C] must be consumed by the caller. +func NewActor(ctx context.Context, params NewActorParams) *Actor { + ctx, cancel := context.WithCancel(ctx) + + actor := &Actor{ + ctx: ctx, + cancel: cancel, + sourceURL: params.SourceURL, + containerClient: params.ContainerClient, + logger: params.Logger, + actorC: make(chan action, cmp.Or(params.ChanSize, defaultChanSize)), + stateC: make(chan domain.Destination, cmp.Or(params.ChanSize, defaultChanSize)), + currURLs: make(map[string]struct{}), + } + + go actor.actorLoop() + + return actor +} + +// ToggleDestination toggles the destination stream between on and off. +func (a *Actor) ToggleDestination(url string) { + a.actorC <- func() { + labels := map[string]string{"component": componentName, "url": url} + + if _, ok := a.currURLs[url]; ok { + a.logger.Info("Stopping live stream", "url", url) + + if err := a.containerClient.RemoveContainers(a.ctx, labels); err != nil { + // TODO: error handling + a.logger.Error("Failed to stop live stream", "url", url, "error", err) + } + + delete(a.currURLs, url) + return + } + + a.logger.Info("Starting live stream", "url", url) + + containerStateC, errC := a.containerClient.RunContainer(a.ctx, container.RunContainerParams{ + Name: componentName + "-" + strconv.Itoa(a.nextIndex), + ContainerConfig: &typescontainer.Config{ + Image: imageNameFFMPEG, + Cmd: []string{ + "-i", a.sourceURL, + "-c", "copy", + "-f", "flv", + url, + }, + Labels: labels, + }, + HostConfig: &typescontainer.HostConfig{ + NetworkMode: "host", + }, + }) + + a.nextIndex++ + a.currURLs[url] = struct{}{} + + a.wg.Add(1) + go func() { + defer a.wg.Done() + + a.destLoop(url, containerStateC, errC) + }() + } +} + +// destLoop is the actor loop for a destination stream. +func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, errC <-chan error) { + defer func() { + a.actorC <- func() { + delete(a.currURLs, url) + } + }() + + state := &domain.Destination{URL: url} + sendState := func() { a.stateC <- *state } + + for { + select { + case containerState := <-containerStateC: + state.Container = containerState + state.Live = containerState.State == "running" + sendState() + case err := <-errC: + // TODO: error handling + if err != nil { + a.logger.Error("Error from container client", "error", err) + } + return + } + } +} + +// C returns a channel that will receive the current state of the multiplexer. +// The channel is never closed. +func (a *Actor) C() <-chan domain.Destination { + return a.stateC +} + +// Close closes the actor. +func (a *Actor) Close() error { + if err := a.containerClient.RemoveContainers(context.Background(), map[string]string{"component": componentName}); err != nil { + return fmt.Errorf("remove containers: %w", err) + } + + a.wg.Wait() + + close(a.actorC) + + return nil +} + +// actorLoop is the main actor loop. +func (a *Actor) actorLoop() { + for act := range a.actorC { + act() + } +} diff --git a/multiplexer/multiplexer_test.go b/multiplexer/multiplexer_test.go new file mode 100644 index 0000000..695f84a --- /dev/null +++ b/multiplexer/multiplexer_test.go @@ -0,0 +1,85 @@ +package multiplexer_test + +import ( + "context" + "testing" + "time" + + "git.netflux.io/rob/termstream/container" + "git.netflux.io/rob/termstream/mediaserver" + "git.netflux.io/rob/termstream/multiplexer" + "git.netflux.io/rob/termstream/testhelpers" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const component = "multiplexer" + +func TestMultiplexer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + logger := testhelpers.NewTestLogger() + containerClient, err := container.NewClient(ctx, logger) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, containerClient.Close()) }) + + running, err := containerClient.ContainerRunning(ctx, map[string]string{"component": component}) + require.NoError(t, err) + assert.False(t, running) + + srv := mediaserver.StartActor(ctx, mediaserver.StartActorParams{ + RTMPPort: 1936, + APIPort: 9998, + FetchIngressStateInterval: 250 * time.Millisecond, + ContainerClient: containerClient, + ChanSize: 1, + Logger: logger, + }) + defer srv.Close() + testhelpers.ChanDiscard(srv.C()) + + time.Sleep(2 * time.Second) + testhelpers.StreamFLV(t, srv.State().URL) + + require.Eventually( + t, + func() bool { return srv.State().Live }, + time.Second*10, + time.Second, + "source not live", + ) + + mp := multiplexer.NewActor(ctx, multiplexer.NewActorParams{ + SourceURL: srv.State().URL, + ChanSize: 1, + ContainerClient: containerClient, + Logger: logger, + }) + defer mp.Close() + testhelpers.ChanDiscard(mp.C()) + + requireListeners(t, srv, 0) + + mp.ToggleDestination("rtmp://localhost:1936/destination/test1") + mp.ToggleDestination("rtmp://localhost:1936/destination/test2") + mp.ToggleDestination("rtmp://localhost:1936/destination/test3") + requireListeners(t, srv, 3) + + mp.ToggleDestination("rtmp://localhost:1936/destination/test3") + requireListeners(t, srv, 2) + + mp.ToggleDestination("rtmp://localhost:1936/destination/test2") + mp.ToggleDestination("rtmp://localhost:1936/destination/test1") + requireListeners(t, srv, 0) +} + +func requireListeners(t *testing.T, srv *mediaserver.Actor, expected int) { + require.Eventually( + t, + func() bool { return srv.State().Listeners == expected }, + time.Second*10, + time.Second, + "expected %d listeners", expected, + ) +} diff --git a/terminal/actor.go b/terminal/actor.go index 98e39d8..fd313eb 100644 --- a/terminal/actor.go +++ b/terminal/actor.go @@ -147,7 +147,7 @@ func (a *Actor) redrawFromState(state domain.AppState) { a.sourceView.SetCell(1, 0, tview.NewTableCell(state.Source.URL)) if state.Source.Live { - a.sourceView.SetCell(1, 1, tview.NewTableCell("[green]on-air")) + a.sourceView.SetCell(1, 1, tview.NewTableCell("[black:green]receiving")) } else { a.sourceView.SetCell(1, 1, tview.NewTableCell("[yellow]off-air")) } @@ -162,11 +162,31 @@ func (a *Actor) redrawFromState(state domain.AppState) { for i, dest := range state.Destinations { a.destView.SetCell(i+1, 0, tview.NewTableCell(dest.URL)) - a.destView.SetCell(i+1, 1, tview.NewTableCell("[yellow]off-air")) - a.destView.SetCell(i+1, 2, tview.NewTableCell("[white]-")) - a.destView.SetCell(i+1, 3, tview.NewTableCell("[white]-")) - a.destView.SetCell(i+1, 4, tview.NewTableCell("[white]-")) - a.destView.SetCell(i+1, 5, tview.NewTableCell("[white]-")) + if dest.Live { + a.destView.SetCell(i+1, 1, tview.NewTableCell("[black:green]sending")) + } else { + a.destView.SetCell(i+1, 1, tview.NewTableCell("[white]off-air")) + } + a.destView.SetCell(i+1, 2, tview.NewTableCell("[white]"+cmp.Or(dest.Container.State, "-"))) + + healthState := "-" + if dest.Container.State == "running" { + healthState = "healthy" + } + a.destView.SetCell(i+1, 3, tview.NewTableCell("[white]"+healthState)) + + cpuPercent := "-" + if dest.Container.State == "running" { + cpuPercent = fmt.Sprintf("%.1f", dest.Container.CPUPercent) + } + + memoryUsage := "-" + if dest.Container.State == "running" { + memoryUsage = fmt.Sprintf("%.1f", float64(dest.Container.MemoryUsageBytes)/1024/1024) + } + + a.destView.SetCell(i+1, 4, tview.NewTableCell("[white]"+cpuPercent)) + a.destView.SetCell(i+1, 5, tview.NewTableCell("[white]"+memoryUsage)) a.destView.SetCell(i+1, 6, tview.NewTableCell("[green]Tab to go live")) } diff --git a/testhelpers/ffmpeg.go b/testhelpers/ffmpeg.go new file mode 100644 index 0000000..c4d50bb --- /dev/null +++ b/testhelpers/ffmpeg.go @@ -0,0 +1,39 @@ +package testhelpers + +import ( + "context" + "os/exec" + "syscall" + "testing" + + "github.com/stretchr/testify/require" +) + +// StreamFLV streams a test video to the given URL. +func StreamFLV(t *testing.T, destURL string) { + ctx, cancel := context.WithCancel(context.Background()) + + cmd := exec.CommandContext( + ctx, + "ffmpeg", + "-r", "30", + "-f", "lavfi", + "-i", "testsrc", + "-vf", "scale=1280:960", + "-vcodec", "libx264", + "-x264-params", "keyint=30:scenecut=0", // 1 key frame per second + "-profile:v", "baseline", + "-pix_fmt", "yuv420p", + "-f", "flv", + destURL, + ) + require.NoError(t, cmd.Start()) + + t.Cleanup(func() { + if cmd.Process != nil { + _ = cmd.Process.Signal(syscall.SIGINT) + } + + cancel() + }) +}