diff --git a/container/container.go b/container/container.go index 0c78050..15d4b66 100644 --- a/container/container.go +++ b/container/container.go @@ -3,7 +3,6 @@ package container import ( "cmp" "context" - "encoding/json" "errors" "fmt" "io" @@ -18,8 +17,9 @@ import ( "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/image" - "github.com/docker/docker/client" + "github.com/docker/docker/api/types/network" "github.com/google/uuid" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) const ( @@ -30,6 +30,24 @@ const ( defaultChanSize = 64 ) +// DockerClient isolates a docker *client.Client. +type DockerClient interface { + io.Closer + + ContainerCreate(context.Context, *container.Config, *container.HostConfig, *network.NetworkingConfig, *ocispec.Platform, string) (container.CreateResponse, error) + ContainerList(context.Context, container.ListOptions) ([]types.Container, error) + ContainerRemove(context.Context, string, container.RemoveOptions) error + ContainerStart(context.Context, string, container.StartOptions) error + ContainerStats(context.Context, string, bool) (container.StatsResponseReader, error) + ContainerStop(context.Context, string, container.StopOptions) error + ContainerWait(context.Context, string, container.WaitCondition) (<-chan container.WaitResponse, <-chan error) + Events(context.Context, events.ListOptions) (<-chan events.Message, <-chan error) + ImagePull(context.Context, string, image.PullOptions) (io.ReadCloser, error) + NetworkConnect(context.Context, string, string, *network.EndpointSettings) error + NetworkCreate(context.Context, string, network.CreateOptions) (network.CreateResponse, error) + NetworkRemove(context.Context, string) error +} + // Client provides a thin wrapper around the Docker API client, and provides // additional functionality such as exposing container stats. type Client struct { @@ -37,24 +55,27 @@ type Client struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup - apiClient *client.Client + apiClient DockerClient + networkID string logger *slog.Logger } // NewClient creates a new Client. -func NewClient(ctx context.Context, logger *slog.Logger) (*Client, error) { - apiClient, err := client.NewClientWithOpts(client.FromEnv) +func NewClient(ctx context.Context, apiClient DockerClient, logger *slog.Logger) (*Client, error) { + id := uuid.New() + network, err := apiClient.NetworkCreate(ctx, "termstream-"+id.String(), network.CreateOptions{Driver: "bridge"}) if err != nil { - return nil, err + return nil, fmt.Errorf("network create: %w", err) } ctx, cancel := context.WithCancel(ctx) client := &Client{ - id: uuid.New(), + id: id, ctx: ctx, cancel: cancel, apiClient: apiClient, + networkID: network.ID, logger: logger, } @@ -65,49 +86,16 @@ func NewClient(ctx context.Context, logger *slog.Logger) (*Client, error) { type stats struct { cpuPercent float64 memoryUsageBytes uint64 + rxRate, txRate int } // getStats returns a channel that will receive container stats. The channel is // never closed, but the spawned goroutine will exit when the context is // cancelled. -func (a *Client) getStats(containerID string) <-chan stats { +func (a *Client) getStats(containerID string, networkCountConfig NetworkCountConfig) <-chan stats { ch := make(chan stats) - go func() { - statsReader, err := a.apiClient.ContainerStats(a.ctx, containerID, true) - if err != nil { - // TODO: error handling? - a.logger.Error("Error getting container stats", "err", err, "id", shortID(containerID)) - return - } - defer statsReader.Body.Close() - - buf := make([]byte, 4_096) - for { - n, err := statsReader.Body.Read(buf) - if err != nil { - if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { - break - } - a.logger.Error("Error reading stats", "err", err, "id", shortID(containerID)) - break - } - - var statsResp container.StatsResponse - if err = json.Unmarshal(buf[:n], &statsResp); err != nil { - a.logger.Error("Error unmarshalling stats", "err", err, "id", shortID(containerID)) - break - } - - // https://stackoverflow.com/a/30292327/62871 - cpuDelta := float64(statsResp.CPUStats.CPUUsage.TotalUsage - statsResp.PreCPUStats.CPUUsage.TotalUsage) - systemDelta := float64(statsResp.CPUStats.SystemUsage - statsResp.PreCPUStats.SystemUsage) - ch <- stats{ - cpuPercent: (cpuDelta / systemDelta) * float64(statsResp.CPUStats.OnlineCPUs) * 100, - memoryUsageBytes: statsResp.MemoryStats.Usage, - } - } - }() + go handleStats(a.ctx, containerID, a.apiClient, networkCountConfig, a.logger, ch) return ch } @@ -157,12 +145,19 @@ func (a *Client) getEvents(containerID string) <-chan events.Message { return sendC } +type NetworkCountConfig struct { + Rx string // the network name to count the Rx bytes + Tx string // the network name to count the Tx bytes +} + // RunContainerParams are the parameters for running a container. type RunContainerParams struct { - Name string - ChanSize int - ContainerConfig *container.Config - HostConfig *container.HostConfig + Name string + ChanSize int + ContainerConfig *container.Config + HostConfig *container.HostConfig + NetworkingConfig *network.NetworkingConfig + NetworkCountConfig NetworkCountConfig } // RunContainer runs a container with the given parameters. @@ -206,7 +201,7 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (< ctx, params.ContainerConfig, params.HostConfig, - nil, + params.NetworkingConfig, nil, name, ) @@ -216,6 +211,11 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (< } containerStateC <- domain.Container{ID: createResp.ID, State: "created"} + if err = a.apiClient.NetworkConnect(ctx, a.networkID, createResp.ID, nil); err != nil { + sendError(fmt.Errorf("network connect: %w", err)) + return + } + if err = a.apiClient.ContainerStart(ctx, createResp.ID, container.StartOptions{}); err != nil { sendError(fmt.Errorf("container start: %w", err)) return @@ -224,7 +224,7 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (< containerStateC <- domain.Container{ID: createResp.ID, State: "running"} - a.runContainerLoop(ctx, createResp.ID, containerStateC, errC) + a.runContainerLoop(ctx, createResp.ID, params.NetworkCountConfig, containerStateC, errC) }() return containerStateC, errC @@ -232,8 +232,14 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (< // runContainerLoop is the control loop for a single container. It returns only // when the container exits. -func (a *Client) runContainerLoop(ctx context.Context, containerID string, stateC chan<- domain.Container, errC chan<- error) { - statsC := a.getStats(containerID) +func (a *Client) runContainerLoop( + ctx context.Context, + containerID string, + networkCountConfig NetworkCountConfig, + stateC chan<- domain.Container, + errC chan<- error, +) { + statsC := a.getStats(containerID, networkCountConfig) eventsC := a.getEvents(containerID) containerRespC, containerErrC := a.apiClient.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) @@ -273,6 +279,8 @@ func (a *Client) runContainerLoop(ctx context.Context, containerID string, state case stats := <-statsC: state.CPUPercent = stats.cpuPercent state.MemoryUsageBytes = stats.memoryUsageBytes + state.RxRate = stats.rxRate + state.TxRate = stats.txRate sendState() } } @@ -298,6 +306,12 @@ func (a *Client) Close() error { a.wg.Wait() + if a.networkID != "" { + if err := a.apiClient.NetworkRemove(ctx, a.networkID); err != nil { + a.logger.Error("Error removing network", "err", err) + } + } + return a.apiClient.Close() } diff --git a/container/container_test.go b/container/integration_test.go similarity index 92% rename from container/container_test.go rename to container/integration_test.go index 050c288..b73af86 100644 --- a/container/container_test.go +++ b/container/integration_test.go @@ -8,6 +8,7 @@ import ( "git.netflux.io/rob/termstream/container" "git.netflux.io/rob/termstream/testhelpers" typescontainer "github.com/docker/docker/api/types/container" + "github.com/docker/docker/client" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -18,10 +19,12 @@ func TestClientStartStop(t *testing.T) { t.Cleanup(cancel) logger := testhelpers.NewTestLogger() + apiClient, err := client.NewClientWithOpts(client.FromEnv) + require.NoError(t, err) containerName := "termstream-test-" + uuid.NewString() component := "test-start-stop" - client, err := container.NewClient(ctx, logger) + client, err := container.NewClient(ctx, apiClient, logger) require.NoError(t, err) running, err := client.ContainerRunning(ctx, map[string]string{"component": component}) @@ -66,9 +69,11 @@ func TestClientRemoveContainers(t *testing.T) { t.Cleanup(cancel) logger := testhelpers.NewTestLogger() + apiClient, err := client.NewClientWithOpts(client.FromEnv) + require.NoError(t, err) component := "test-remove-containers" - client, err := container.NewClient(ctx, logger) + client, err := container.NewClient(ctx, apiClient, logger) require.NoError(t, err) t.Cleanup(func() { client.Close() }) diff --git a/container/stats.go b/container/stats.go new file mode 100644 index 0000000..1ca6749 --- /dev/null +++ b/container/stats.go @@ -0,0 +1,99 @@ +package container + +import ( + "cmp" + "context" + "encoding/json" + "errors" + "io" + "log/slog" + + "github.com/docker/docker/api/types/container" +) + +func handleStats( + ctx context.Context, + containerID string, + apiClient DockerClient, + networkCountConfig NetworkCountConfig, + logger *slog.Logger, + ch chan<- stats, +) { + networkNameRx := cmp.Or(networkCountConfig.Rx, "eth0") + networkNameTx := cmp.Or(networkCountConfig.Tx, "eth0") + + statsReader, err := apiClient.ContainerStats(ctx, containerID, true) + if err != nil { + // TODO: error handling? + logger.Error("Error getting container stats", "err", err, "id", shortID(containerID)) + return + } + defer statsReader.Body.Close() + + var ( + processedAny bool + lastNetworkRx uint64 + lastNetworkTx uint64 + ) + + getAvgRxRate := rolling(10) + getAvgTxRate := rolling(10) + + buf := make([]byte, 4_096) + for { + n, err := statsReader.Body.Read(buf) + if err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + break + } + logger.Error("Error reading stats", "err", err, "id", shortID(containerID)) + break + } + + var statsResp container.StatsResponse + if err = json.Unmarshal(buf[:n], &statsResp); err != nil { + logger.Error("Error unmarshalling stats", "err", err, "id", shortID(containerID)) + break + } + + // https://stackoverflow.com/a/30292327/62871 + cpuDelta := float64(statsResp.CPUStats.CPUUsage.TotalUsage - statsResp.PreCPUStats.CPUUsage.TotalUsage) + systemDelta := float64(statsResp.CPUStats.SystemUsage - statsResp.PreCPUStats.SystemUsage) + + var avgRxRate, avgTxRate float64 + if processedAny { + secondsSinceLastReceived := statsResp.Read.Sub(statsResp.PreRead).Seconds() + diffRxBytes := (statsResp.Networks[networkNameRx].RxBytes - lastNetworkRx) + diffTxBytes := (statsResp.Networks[networkNameTx].TxBytes - lastNetworkTx) + rxRate := float64(diffRxBytes) / secondsSinceLastReceived / 1000.0 * 8 + txRate := float64(diffTxBytes) / secondsSinceLastReceived / 1000.0 * 8 + avgRxRate = getAvgRxRate(rxRate) + avgTxRate = getAvgTxRate(txRate) + } + + lastNetworkRx = statsResp.Networks[networkNameRx].RxBytes + lastNetworkTx = statsResp.Networks[networkNameTx].TxBytes + processedAny = true + + ch <- stats{ + cpuPercent: (cpuDelta / systemDelta) * float64(statsResp.CPUStats.OnlineCPUs) * 100, + memoryUsageBytes: statsResp.MemoryStats.Usage, + rxRate: int(avgRxRate), + txRate: int(avgTxRate), + } + } +} + +// https://stackoverflow.com/a/12539781/62871 +func rolling(n int) func(float64) float64 { + bins := make([]float64, n) + var avg float64 + var i int + + return func(x float64) float64 { + avg += (x - bins[i]) / float64(n) + bins[i] = x + i = (i + 1) % n + return avg + } +} diff --git a/container/stats_test.go b/container/stats_test.go new file mode 100644 index 0000000..8efe668 --- /dev/null +++ b/container/stats_test.go @@ -0,0 +1,58 @@ +package container + +import ( + "bufio" + "bytes" + "context" + _ "embed" + "io" + "testing" + + "git.netflux.io/rob/termstream/testhelpers" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +//go:embed testdata/stats1.json +var statsJSON []byte + +func TestHandleStats(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + pr, pw := io.Pipe() + containerID := "b905f51b47242090ae504c184c7bc84d6274511ef763c1847039dcaa00a3ad27" + dockerClient := testhelpers.MockDockerClient{ContainerStatsResponse: pr} + networkCountConfig := NetworkCountConfig{Rx: "eth0", Tx: "eth1"} + logger := testhelpers.NewTestLogger() + ch := make(chan stats) + + go func() { + defer close(ch) + + handleStats(ctx, containerID, &dockerClient, networkCountConfig, logger, ch) + }() + + go func() { + defer pw.Close() + + scanner := bufio.NewScanner(bytes.NewReader(statsJSON)) + for scanner.Scan() { + _, err := pw.Write(scanner.Bytes()) + require.NoError(t, err) + } + }() + + var count int + var lastStats stats + for stats := range ch { + count++ + lastStats = stats + } + + require.Equal(t, 10, count) + assert.Equal(t, 4.254369426751593, lastStats.cpuPercent) + assert.Equal(t, uint64(8802304), lastStats.memoryUsageBytes) + assert.Equal(t, 1091, lastStats.rxRate) + assert.Equal(t, 1108, lastStats.txRate) +} diff --git a/container/testdata/stats1.json b/container/testdata/stats1.json new file mode 100644 index 0000000..6a228fc --- /dev/null +++ b/container/testdata/stats1.json @@ -0,0 +1,10 @@ +{"read":"2025-02-05T17:01:20.137117141Z","preread":"0001-01-01T00:00:00Z","pids_stats":{"current":13,"limit":18064},"blkio_stats":{"io_service_bytes_recursive":[{"major":259,"minor":0,"op":"read","value":49152},{"major":259,"minor":0,"op":"write","value":0},{"major":254,"minor":0,"op":"read","value":49152},{"major":254,"minor":0,"op":"write","value":0}],"io_serviced_recursive":null,"io_queue_recursive":null,"io_service_time_recursive":null,"io_wait_time_recursive":null,"io_merged_recursive":null,"io_time_recursive":null,"sectors_recursive":null},"num_procs":0,"storage_stats":{},"cpu_stats":{"cpu_usage":{"total_usage":819184000,"usage_in_kernelmode":359404000,"usage_in_usermode":459780000},"system_cpu_usage":7736013230000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"precpu_stats":{"cpu_usage":{"total_usage":712183000,"usage_in_kernelmode":325497000,"usage_in_usermode":386686000},"system_cpu_usage":7736005250000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"memory_stats":{"usage":8781824,"stats":{"active_anon":0,"active_file":0,"anon":7434240,"anon_thp":0,"file":0,"file_dirty":0,"file_mapped":0,"file_writeback":0,"inactive_anon":7569408,"inactive_file":0,"kernel_stack":245760,"pgactivate":0,"pgdeactivate":0,"pgfault":7029,"pglazyfree":0,"pglazyfreed":0,"pgmajfault":0,"pgrefill":0,"pgscan":0,"pgsteal":0,"shmem":0,"slab":133328,"slab_reclaimable":0,"slab_unreclaimable":133328,"sock":0,"thp_collapse_alloc":0,"thp_fault_alloc":0,"unevictable":0,"workingset_activate":0,"workingset_nodereclaim":0,"workingset_refault":0},"limit":15815278592},"name":"/termstream-9bc4f2c4-fa6b-4f94-aa36-f9bfaecc498f-mediaserver","id":"f9b906b7d796c6b009cf309e9d980f5103da627988506653662519c88da71ab7","networks":{"eth0":{"rx_bytes":2196425,"rx_packets":1344,"rx_errors":0,"rx_dropped":0,"tx_bytes":82398,"tx_packets":1165,"tx_errors":0,"tx_dropped":0},"eth1":{"rx_bytes":1703196,"rx_packets":2164,"rx_errors":0,"rx_dropped":0,"tx_bytes":1689973,"tx_packets":2009,"tx_errors":0,"tx_dropped":0}}} +{"read":"2025-02-05T17:01:21.14260362Z","preread":"2025-02-05T17:01:20.137117141Z","pids_stats":{"current":13,"limit":18064},"blkio_stats":{"io_service_bytes_recursive":[{"major":259,"minor":0,"op":"read","value":49152},{"major":259,"minor":0,"op":"write","value":0},{"major":254,"minor":0,"op":"read","value":49152},{"major":254,"minor":0,"op":"write","value":0}],"io_serviced_recursive":null,"io_queue_recursive":null,"io_service_time_recursive":null,"io_wait_time_recursive":null,"io_merged_recursive":null,"io_time_recursive":null,"sectors_recursive":null},"num_procs":0,"storage_stats":{},"cpu_stats":{"cpu_usage":{"total_usage":866191000,"usage_in_kernelmode":379415000,"usage_in_usermode":486775000},"system_cpu_usage":7736021190000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"precpu_stats":{"cpu_usage":{"total_usage":819184000,"usage_in_kernelmode":359404000,"usage_in_usermode":459780000},"system_cpu_usage":7736013230000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"memory_stats":{"usage":8687616,"stats":{"active_anon":0,"active_file":0,"anon":7434240,"anon_thp":0,"file":0,"file_dirty":0,"file_mapped":0,"file_writeback":0,"inactive_anon":7569408,"inactive_file":0,"kernel_stack":245760,"pgactivate":0,"pgdeactivate":0,"pgfault":7029,"pglazyfree":0,"pglazyfreed":0,"pgmajfault":0,"pgrefill":0,"pgscan":0,"pgsteal":0,"shmem":0,"slab":133328,"slab_reclaimable":0,"slab_unreclaimable":133328,"sock":0,"thp_collapse_alloc":0,"thp_fault_alloc":0,"unevictable":0,"workingset_activate":0,"workingset_nodereclaim":0,"workingset_refault":0},"limit":15815278592},"name":"/termstream-9bc4f2c4-fa6b-4f94-aa36-f9bfaecc498f-mediaserver","id":"f9b906b7d796c6b009cf309e9d980f5103da627988506653662519c88da71ab7","networks":{"eth0":{"rx_bytes":2330574,"rx_packets":1438,"rx_errors":0,"rx_dropped":0,"tx_bytes":88562,"tx_packets":1245,"tx_errors":0,"tx_dropped":0},"eth1":{"rx_bytes":1844024,"rx_packets":2355,"rx_errors":0,"rx_dropped":0,"tx_bytes":1830848,"tx_packets":2207,"tx_errors":0,"tx_dropped":0}}} +{"read":"2025-02-05T17:01:22.155870322Z","preread":"2025-02-05T17:01:21.14260362Z","pids_stats":{"current":13,"limit":18064},"blkio_stats":{"io_service_bytes_recursive":[{"major":259,"minor":0,"op":"read","value":49152},{"major":259,"minor":0,"op":"write","value":0},{"major":254,"minor":0,"op":"read","value":49152},{"major":254,"minor":0,"op":"write","value":0}],"io_serviced_recursive":null,"io_queue_recursive":null,"io_service_time_recursive":null,"io_wait_time_recursive":null,"io_merged_recursive":null,"io_time_recursive":null,"sectors_recursive":null},"num_procs":0,"storage_stats":{},"cpu_stats":{"cpu_usage":{"total_usage":913275000,"usage_in_kernelmode":398734000,"usage_in_usermode":514540000},"system_cpu_usage":7736029170000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"precpu_stats":{"cpu_usage":{"total_usage":866191000,"usage_in_kernelmode":379415000,"usage_in_usermode":486775000},"system_cpu_usage":7736021190000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"memory_stats":{"usage":8749056,"stats":{"active_anon":0,"active_file":0,"anon":7434240,"anon_thp":0,"file":0,"file_dirty":0,"file_mapped":0,"file_writeback":0,"inactive_anon":7704576,"inactive_file":0,"kernel_stack":245760,"pgactivate":0,"pgdeactivate":0,"pgfault":7029,"pglazyfree":0,"pglazyfreed":0,"pgmajfault":0,"pgrefill":0,"pgscan":0,"pgsteal":0,"shmem":0,"slab":133328,"slab_reclaimable":0,"slab_unreclaimable":133328,"sock":0,"thp_collapse_alloc":0,"thp_fault_alloc":0,"unevictable":0,"workingset_activate":0,"workingset_nodereclaim":0,"workingset_refault":0},"limit":15815278592},"name":"/termstream-9bc4f2c4-fa6b-4f94-aa36-f9bfaecc498f-mediaserver","id":"f9b906b7d796c6b009cf309e9d980f5103da627988506653662519c88da71ab7","networks":{"eth0":{"rx_bytes":2510652,"rx_packets":1529,"rx_errors":0,"rx_dropped":0,"tx_bytes":93776,"tx_packets":1324,"tx_errors":0,"tx_dropped":0},"eth1":{"rx_bytes":2030256,"rx_packets":2548,"rx_errors":0,"rx_dropped":0,"tx_bytes":2018217,"tx_packets":2409,"tx_errors":0,"tx_dropped":0}}} +{"read":"2025-02-05T17:01:23.160742801Z","preread":"2025-02-05T17:01:22.155870322Z","pids_stats":{"current":13,"limit":18064},"blkio_stats":{"io_service_bytes_recursive":[{"major":259,"minor":0,"op":"read","value":49152},{"major":259,"minor":0,"op":"write","value":0},{"major":254,"minor":0,"op":"read","value":49152},{"major":254,"minor":0,"op":"write","value":0}],"io_serviced_recursive":null,"io_queue_recursive":null,"io_service_time_recursive":null,"io_wait_time_recursive":null,"io_merged_recursive":null,"io_time_recursive":null,"sectors_recursive":null},"num_procs":0,"storage_stats":{},"cpu_stats":{"cpu_usage":{"total_usage":966402000,"usage_in_kernelmode":447388000,"usage_in_usermode":519013000},"system_cpu_usage":7736037050000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"precpu_stats":{"cpu_usage":{"total_usage":913275000,"usage_in_kernelmode":398734000,"usage_in_usermode":514540000},"system_cpu_usage":7736029170000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"memory_stats":{"usage":8761344,"stats":{"active_anon":0,"active_file":0,"anon":7434240,"anon_thp":0,"file":0,"file_dirty":0,"file_mapped":0,"file_writeback":0,"inactive_anon":7704576,"inactive_file":0,"kernel_stack":245760,"pgactivate":0,"pgdeactivate":0,"pgfault":7029,"pglazyfree":0,"pglazyfreed":0,"pgmajfault":0,"pgrefill":0,"pgscan":0,"pgsteal":0,"shmem":0,"slab":133328,"slab_reclaimable":0,"slab_unreclaimable":133328,"sock":0,"thp_collapse_alloc":0,"thp_fault_alloc":0,"unevictable":0,"workingset_activate":0,"workingset_nodereclaim":0,"workingset_refault":0},"limit":15815278592},"name":"/termstream-9bc4f2c4-fa6b-4f94-aa36-f9bfaecc498f-mediaserver","id":"f9b906b7d796c6b009cf309e9d980f5103da627988506653662519c88da71ab7","networks":{"eth0":{"rx_bytes":2669413,"rx_packets":1622,"rx_errors":0,"rx_dropped":0,"tx_bytes":99452,"tx_packets":1410,"tx_errors":0,"tx_dropped":0},"eth1":{"rx_bytes":2194470,"rx_packets":2740,"rx_errors":0,"rx_dropped":0,"tx_bytes":2183821,"tx_packets":2606,"tx_errors":0,"tx_dropped":0}}} +{"read":"2025-02-05T17:01:24.167215055Z","preread":"2025-02-05T17:01:23.160742801Z","pids_stats":{"current":13,"limit":18064},"blkio_stats":{"io_service_bytes_recursive":[{"major":259,"minor":0,"op":"read","value":49152},{"major":259,"minor":0,"op":"write","value":0},{"major":254,"minor":0,"op":"read","value":49152},{"major":254,"minor":0,"op":"write","value":0}],"io_serviced_recursive":null,"io_queue_recursive":null,"io_service_time_recursive":null,"io_wait_time_recursive":null,"io_merged_recursive":null,"io_time_recursive":null,"sectors_recursive":null},"num_procs":0,"storage_stats":{},"cpu_stats":{"cpu_usage":{"total_usage":1018108000,"usage_in_kernelmode":475033000,"usage_in_usermode":543074000},"system_cpu_usage":7736044960000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"precpu_stats":{"cpu_usage":{"total_usage":966402000,"usage_in_kernelmode":447388000,"usage_in_usermode":519013000},"system_cpu_usage":7736037050000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"memory_stats":{"usage":8826880,"stats":{"active_anon":0,"active_file":0,"anon":7569408,"anon_thp":0,"file":0,"file_dirty":0,"file_mapped":0,"file_writeback":0,"inactive_anon":7704576,"inactive_file":0,"kernel_stack":245760,"pgactivate":0,"pgdeactivate":0,"pgfault":7029,"pglazyfree":0,"pglazyfreed":0,"pgmajfault":0,"pgrefill":0,"pgscan":0,"pgsteal":0,"shmem":0,"slab":133328,"slab_reclaimable":0,"slab_unreclaimable":133328,"sock":0,"thp_collapse_alloc":0,"thp_fault_alloc":0,"unevictable":0,"workingset_activate":0,"workingset_nodereclaim":0,"workingset_refault":0},"limit":15815278592},"name":"/termstream-9bc4f2c4-fa6b-4f94-aa36-f9bfaecc498f-mediaserver","id":"f9b906b7d796c6b009cf309e9d980f5103da627988506653662519c88da71ab7","networks":{"eth0":{"rx_bytes":2828639,"rx_packets":1711,"rx_errors":0,"rx_dropped":0,"tx_bytes":104666,"tx_packets":1489,"tx_errors":0,"tx_dropped":0},"eth1":{"rx_bytes":2360195,"rx_packets":2932,"rx_errors":0,"rx_dropped":0,"tx_bytes":2350223,"tx_packets":2804,"tx_errors":0,"tx_dropped":0}}} +{"read":"2025-02-05T17:01:25.173236428Z","preread":"2025-02-05T17:01:24.167215055Z","pids_stats":{"current":13,"limit":18064},"blkio_stats":{"io_service_bytes_recursive":[{"major":259,"minor":0,"op":"read","value":49152},{"major":259,"minor":0,"op":"write","value":0},{"major":254,"minor":0,"op":"read","value":49152},{"major":254,"minor":0,"op":"write","value":0}],"io_serviced_recursive":null,"io_queue_recursive":null,"io_service_time_recursive":null,"io_wait_time_recursive":null,"io_merged_recursive":null,"io_time_recursive":null,"sectors_recursive":null},"num_procs":0,"storage_stats":{},"cpu_stats":{"cpu_usage":{"total_usage":1060844000,"usage_in_kernelmode":502946000,"usage_in_usermode":557897000},"system_cpu_usage":7736052900000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"precpu_stats":{"cpu_usage":{"total_usage":1018108000,"usage_in_kernelmode":475033000,"usage_in_usermode":543074000},"system_cpu_usage":7736044960000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"memory_stats":{"usage":8785920,"stats":{"active_anon":0,"active_file":0,"anon":7569408,"anon_thp":0,"file":0,"file_dirty":0,"file_mapped":0,"file_writeback":0,"inactive_anon":7704576,"inactive_file":0,"kernel_stack":245760,"pgactivate":0,"pgdeactivate":0,"pgfault":7029,"pglazyfree":0,"pglazyfreed":0,"pgmajfault":0,"pgrefill":0,"pgscan":0,"pgsteal":0,"shmem":0,"slab":133328,"slab_reclaimable":0,"slab_unreclaimable":133328,"sock":0,"thp_collapse_alloc":0,"thp_fault_alloc":0,"unevictable":0,"workingset_activate":0,"workingset_nodereclaim":0,"workingset_refault":0},"limit":15815278592},"name":"/termstream-9bc4f2c4-fa6b-4f94-aa36-f9bfaecc498f-mediaserver","id":"f9b906b7d796c6b009cf309e9d980f5103da627988506653662519c88da71ab7","networks":{"eth0":{"rx_bytes":2938169,"rx_packets":1796,"rx_errors":0,"rx_dropped":0,"tx_bytes":109880,"tx_packets":1568,"tx_errors":0,"tx_dropped":0},"eth1":{"rx_bytes":2478337,"rx_packets":3119,"rx_errors":0,"rx_dropped":0,"tx_bytes":2467333,"tx_packets":3004,"tx_errors":0,"tx_dropped":0}}} +{"read":"2025-02-05T17:01:26.179616066Z","preread":"2025-02-05T17:01:25.173236428Z","pids_stats":{"current":13,"limit":18064},"blkio_stats":{"io_service_bytes_recursive":[{"major":259,"minor":0,"op":"read","value":49152},{"major":259,"minor":0,"op":"write","value":0},{"major":254,"minor":0,"op":"read","value":49152},{"major":254,"minor":0,"op":"write","value":0}],"io_serviced_recursive":null,"io_queue_recursive":null,"io_service_time_recursive":null,"io_wait_time_recursive":null,"io_merged_recursive":null,"io_time_recursive":null,"sectors_recursive":null},"num_procs":0,"storage_stats":{},"cpu_stats":{"cpu_usage":{"total_usage":1118107000,"usage_in_kernelmode":528240000,"usage_in_usermode":589866000},"system_cpu_usage":7736060850000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"precpu_stats":{"cpu_usage":{"total_usage":1060844000,"usage_in_kernelmode":502946000,"usage_in_usermode":557897000},"system_cpu_usage":7736052900000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"memory_stats":{"usage":9170944,"stats":{"active_anon":0,"active_file":0,"anon":7704576,"anon_thp":0,"file":0,"file_dirty":0,"file_mapped":0,"file_writeback":0,"inactive_anon":7839744,"inactive_file":0,"kernel_stack":245760,"pgactivate":0,"pgdeactivate":0,"pgfault":7194,"pglazyfree":0,"pglazyfreed":0,"pgmajfault":0,"pgrefill":0,"pgscan":0,"pgsteal":0,"shmem":0,"slab":133328,"slab_reclaimable":0,"slab_unreclaimable":133328,"sock":0,"thp_collapse_alloc":0,"thp_fault_alloc":0,"unevictable":0,"workingset_activate":0,"workingset_nodereclaim":0,"workingset_refault":0},"limit":15815278592},"name":"/termstream-9bc4f2c4-fa6b-4f94-aa36-f9bfaecc498f-mediaserver","id":"f9b906b7d796c6b009cf309e9d980f5103da627988506653662519c88da71ab7","networks":{"eth0":{"rx_bytes":3134326,"rx_packets":1893,"rx_errors":0,"rx_dropped":0,"tx_bytes":116308,"tx_packets":1652,"tx_errors":0,"tx_dropped":0},"eth1":{"rx_bytes":2672756,"rx_packets":3314,"rx_errors":0,"rx_dropped":0,"tx_bytes":2670151,"tx_packets":3204,"tx_errors":0,"tx_dropped":0}}} +{"read":"2025-02-05T17:01:27.18855043Z","preread":"2025-02-05T17:01:26.179616066Z","pids_stats":{"current":13,"limit":18064},"blkio_stats":{"io_service_bytes_recursive":[{"major":259,"minor":0,"op":"read","value":49152},{"major":259,"minor":0,"op":"write","value":0},{"major":254,"minor":0,"op":"read","value":49152},{"major":254,"minor":0,"op":"write","value":0}],"io_serviced_recursive":null,"io_queue_recursive":null,"io_service_time_recursive":null,"io_wait_time_recursive":null,"io_merged_recursive":null,"io_time_recursive":null,"sectors_recursive":null},"num_procs":0,"storage_stats":{},"cpu_stats":{"cpu_usage":{"total_usage":1171124000,"usage_in_kernelmode":557386000,"usage_in_usermode":613738000},"system_cpu_usage":7736068880000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"precpu_stats":{"cpu_usage":{"total_usage":1118107000,"usage_in_kernelmode":528240000,"usage_in_usermode":589866000},"system_cpu_usage":7736060850000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"memory_stats":{"usage":8970240,"stats":{"active_anon":0,"active_file":0,"anon":7704576,"anon_thp":0,"file":0,"file_dirty":0,"file_mapped":0,"file_writeback":0,"inactive_anon":7839744,"inactive_file":0,"kernel_stack":245760,"pgactivate":0,"pgdeactivate":0,"pgfault":7194,"pglazyfree":0,"pglazyfreed":0,"pgmajfault":0,"pgrefill":0,"pgscan":0,"pgsteal":0,"shmem":0,"slab":133328,"slab_reclaimable":0,"slab_unreclaimable":133328,"sock":0,"thp_collapse_alloc":0,"thp_fault_alloc":0,"unevictable":0,"workingset_activate":0,"workingset_nodereclaim":0,"workingset_refault":0},"limit":15815278592},"name":"/termstream-9bc4f2c4-fa6b-4f94-aa36-f9bfaecc498f-mediaserver","id":"f9b906b7d796c6b009cf309e9d980f5103da627988506653662519c88da71ab7","networks":{"eth0":{"rx_bytes":3297792,"rx_packets":1985,"rx_errors":0,"rx_dropped":0,"tx_bytes":121852,"tx_packets":1736,"tx_errors":0,"tx_dropped":0},"eth1":{"rx_bytes":2844481,"rx_packets":3512,"rx_errors":0,"rx_dropped":0,"tx_bytes":2840655,"tx_packets":3403,"tx_errors":0,"tx_dropped":0}}} +{"read":"2025-02-05T17:01:28.195742093Z","preread":"2025-02-05T17:01:27.18855043Z","pids_stats":{"current":13,"limit":18064},"blkio_stats":{"io_service_bytes_recursive":[{"major":259,"minor":0,"op":"read","value":49152},{"major":259,"minor":0,"op":"write","value":0},{"major":254,"minor":0,"op":"read","value":49152},{"major":254,"minor":0,"op":"write","value":0}],"io_serviced_recursive":null,"io_queue_recursive":null,"io_service_time_recursive":null,"io_wait_time_recursive":null,"io_merged_recursive":null,"io_time_recursive":null,"sectors_recursive":null},"num_procs":0,"storage_stats":{},"cpu_stats":{"cpu_usage":{"total_usage":1215550000,"usage_in_kernelmode":583643000,"usage_in_usermode":631906000},"system_cpu_usage":7736076800000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"precpu_stats":{"cpu_usage":{"total_usage":1171124000,"usage_in_kernelmode":557386000,"usage_in_usermode":613738000},"system_cpu_usage":7736068880000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"memory_stats":{"usage":8974336,"stats":{"active_anon":0,"active_file":0,"anon":7704576,"anon_thp":0,"file":0,"file_dirty":0,"file_mapped":0,"file_writeback":0,"inactive_anon":7839744,"inactive_file":0,"kernel_stack":245760,"pgactivate":0,"pgdeactivate":0,"pgfault":7194,"pglazyfree":0,"pglazyfreed":0,"pgmajfault":0,"pgrefill":0,"pgscan":0,"pgsteal":0,"shmem":0,"slab":133328,"slab_reclaimable":0,"slab_unreclaimable":133328,"sock":0,"thp_collapse_alloc":0,"thp_fault_alloc":0,"unevictable":0,"workingset_activate":0,"workingset_nodereclaim":0,"workingset_refault":0},"limit":15815278592},"name":"/termstream-9bc4f2c4-fa6b-4f94-aa36-f9bfaecc498f-mediaserver","id":"f9b906b7d796c6b009cf309e9d980f5103da627988506653662519c88da71ab7","networks":{"eth0":{"rx_bytes":3449190,"rx_packets":2072,"rx_errors":0,"rx_dropped":0,"tx_bytes":127264,"tx_packets":1818,"tx_errors":0,"tx_dropped":0},"eth1":{"rx_bytes":3006636,"rx_packets":3697,"rx_errors":0,"rx_dropped":0,"tx_bytes":2999359,"tx_packets":3601,"tx_errors":0,"tx_dropped":0}}} +{"read":"2025-02-05T17:01:29.201773306Z","preread":"2025-02-05T17:01:28.195742093Z","pids_stats":{"current":13,"limit":18064},"blkio_stats":{"io_service_bytes_recursive":[{"major":259,"minor":0,"op":"read","value":49152},{"major":259,"minor":0,"op":"write","value":0},{"major":254,"minor":0,"op":"read","value":49152},{"major":254,"minor":0,"op":"write","value":0}],"io_serviced_recursive":null,"io_queue_recursive":null,"io_service_time_recursive":null,"io_wait_time_recursive":null,"io_merged_recursive":null,"io_time_recursive":null,"sectors_recursive":null},"num_procs":0,"storage_stats":{},"cpu_stats":{"cpu_usage":{"total_usage":1257296000,"usage_in_kernelmode":614088000,"usage_in_usermode":643208000},"system_cpu_usage":7736084650000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"precpu_stats":{"cpu_usage":{"total_usage":1215550000,"usage_in_kernelmode":583643000,"usage_in_usermode":631906000},"system_cpu_usage":7736076800000000,"online_cpus":8,"throttling_data":{"periods":0,"throttled_periods":0,"throttled_time":0}},"memory_stats":{"usage":8802304,"stats":{"active_anon":0,"active_file":0,"anon":7839744,"anon_thp":0,"file":0,"file_dirty":0,"file_mapped":0,"file_writeback":0,"inactive_anon":7704576,"inactive_file":0,"kernel_stack":245760,"pgactivate":0,"pgdeactivate":0,"pgfault":7194,"pglazyfree":0,"pglazyfreed":0,"pgmajfault":0,"pgrefill":0,"pgscan":0,"pgsteal":0,"shmem":0,"slab":133328,"slab_reclaimable":0,"slab_unreclaimable":133328,"sock":0,"thp_collapse_alloc":0,"thp_fault_alloc":0,"unevictable":0,"workingset_activate":0,"workingset_nodereclaim":0,"workingset_refault":0},"limit":15815278592},"name":"/termstream-9bc4f2c4-fa6b-4f94-aa36-f9bfaecc498f-mediaserver","id":"f9b906b7d796c6b009cf309e9d980f5103da627988506653662519c88da71ab7","networks":{"eth0":{"rx_bytes":3570666,"rx_packets":2158,"rx_errors":0,"rx_dropped":0,"tx_bytes":132742,"tx_packets":1901,"tx_errors":0,"tx_dropped":0},"eth1":{"rx_bytes":3094988,"rx_packets":3833,"rx_errors":0,"rx_dropped":0,"tx_bytes":3085561,"tx_packets":3738,"tx_errors":0,"tx_dropped":0}}} diff --git a/domain/types.go b/domain/types.go index f0aa815..6eef2ca 100644 --- a/domain/types.go +++ b/domain/types.go @@ -8,10 +8,11 @@ type AppState struct { // Source represents the source, currently always the mediaserver. type Source struct { - Container Container - Live bool - Listeners int - URL string + Container Container + Live bool + Listeners int + RTMPURL string + RTMPInternalURL string } // Destination is a single destination. @@ -31,4 +32,6 @@ type Container struct { HealthState string CPUPercent float64 MemoryUsageBytes uint64 + RxRate int + TxRate int } diff --git a/go.mod b/go.mod index 92c4d23..3bdbcca 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/docker/go-connections v0.5.0 github.com/gdamore/tcell/v2 v2.7.1 github.com/google/uuid v1.6.0 + github.com/opencontainers/image-spec v1.1.0 github.com/rivo/tview v0.0.0-20241227133733-17b7edb88c57 github.com/stretchr/testify v1.10.0 gopkg.in/yaml.v3 v3.0.1 @@ -29,7 +30,6 @@ require ( github.com/moby/term v0.5.2 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect - github.com/opencontainers/image-spec v1.1.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect diff --git a/main.go b/main.go index f4ea960..50a07a2 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,7 @@ import ( "git.netflux.io/rob/termstream/mediaserver" "git.netflux.io/rob/termstream/multiplexer" "git.netflux.io/rob/termstream/terminal" + "github.com/docker/docker/client" ) func main() { @@ -52,7 +53,12 @@ func run(ctx context.Context, cfgReader io.Reader) error { updateUI := func() { ui.SetState(*state) } updateUI() - containerClient, err := container.NewClient(ctx, logger.With("component", "container_client")) + apiClient, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + return fmt.Errorf("new docker client: %w", err) + } + + containerClient, err := container.NewClient(ctx, apiClient, logger.With("component", "container_client")) if err != nil { return fmt.Errorf("new container client: %w", err) } @@ -65,7 +71,7 @@ func run(ctx context.Context, cfgReader io.Reader) error { defer srv.Close() mp := multiplexer.NewActor(ctx, multiplexer.NewActorParams{ - SourceURL: srv.State().URL, + SourceURL: srv.State().RTMPInternalURL, ContainerClient: containerClient, Logger: logger.With("component", "multiplexer"), }) diff --git a/mediaserver/actor.go b/mediaserver/actor.go index 91821d0..a7548ba 100644 --- a/mediaserver/actor.go +++ b/mediaserver/actor.go @@ -18,8 +18,8 @@ import ( const ( defaultFetchIngressStateInterval = 5 * time.Second // default interval to fetch the state of the media server - defaultAPIPort = 9997 // default API port for the media server - defaultRTMPPort = 1935 // default RTMP port for the media server + defaultAPIPort = 9997 // default API host port for the media server + defaultRTMPPort = 1935 // default RTMP host port for the media server defaultChanSize = 64 // default channel size for asynchronous non-error channels imageNameMediaMTX = "netfluxio/mediamtx-alpine:latest" // image name for mediamtx rtmpPath = "live" // RTMP path for the media server @@ -84,7 +84,8 @@ func StartActor(ctx context.Context, params StartActorParams) *Actor { Name: componentName, ChanSize: chanSize, ContainerConfig: &typescontainer.Config{ - Image: imageNameMediaMTX, + Image: imageNameMediaMTX, + Hostname: "mediaserver", Env: []string{ "MTX_LOGLEVEL=info", "MTX_API=yes", @@ -105,10 +106,12 @@ func StartActor(ctx context.Context, params StartActorParams) *Actor { NetworkMode: "default", PortBindings: portBindings, }, + NetworkCountConfig: container.NetworkCountConfig{Rx: "eth0", Tx: "eth1"}, }, ) - actor.state.URL = actor.rtmpURL() + actor.state.RTMPURL = actor.rtmpURL() + actor.state.RTMPInternalURL = actor.rtmpInternalURL() go actor.actorLoop(containerStateC, errC) @@ -152,6 +155,7 @@ func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan e select { case containerState := <-containerStateC: s.state.Container = containerState + sendState() continue @@ -191,6 +195,13 @@ func (s *Actor) rtmpURL() string { return fmt.Sprintf("rtmp://localhost:%d/%s", s.rtmpPort, rtmpPath) } +// rtmpInternalURL returns the RTMP URL for the media server, accessible from +// the app network. +func (s *Actor) rtmpInternalURL() string { + // Container port, not host port: + return fmt.Sprintf("rtmp://mediaserver:1935/%s", rtmpPath) +} + // apiURL returns the API URL for the media server, accessible from the host. func (s *Actor) apiURL() string { return fmt.Sprintf("http://localhost:%d/v3/rtmpconns/list", s.apiPort) diff --git a/mediaserver/actor_test.go b/mediaserver/integration_test.go similarity index 79% rename from mediaserver/actor_test.go rename to mediaserver/integration_test.go index 859855c..6d7ebab 100644 --- a/mediaserver/actor_test.go +++ b/mediaserver/integration_test.go @@ -8,6 +8,7 @@ import ( "git.netflux.io/rob/termstream/container" "git.netflux.io/rob/termstream/mediaserver" "git.netflux.io/rob/termstream/testhelpers" + "github.com/docker/docker/client" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -19,7 +20,10 @@ func TestMediaServerStartStop(t *testing.T) { t.Cleanup(cancel) logger := testhelpers.NewTestLogger() - containerClient, err := container.NewClient(ctx, logger) + apiClient, err := client.NewClientWithOpts(client.FromEnv) + require.NoError(t, err) + + containerClient, err := container.NewClient(ctx, apiClient, logger) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, containerClient.Close()) }) @@ -49,7 +53,7 @@ func TestMediaServerStartStop(t *testing.T) { state := mediaServer.State() assert.False(t, state.Live) - assert.Equal(t, "rtmp://localhost:1935/live", state.URL) + assert.Equal(t, "rtmp://localhost:1935/live", state.RTMPURL) testhelpers.StreamFLV(t, "rtmp://localhost:1935/live") @@ -64,6 +68,17 @@ func TestMediaServerStartStop(t *testing.T) { "actor not healthy and/or in LIVE state", ) + require.Eventually( + t, + func() bool { + currState := mediaServer.State() + return currState.Container.RxRate > 500 + }, + time.Second*10, + time.Second, + "actor not healthy and/or in LIVE state", + ) + mediaServer.Close() running, err = containerClient.ContainerRunning(ctx, map[string]string{"component": component}) diff --git a/multiplexer/multiplexer_test.go b/multiplexer/integration_test.go similarity index 72% rename from multiplexer/multiplexer_test.go rename to multiplexer/integration_test.go index 695f84a..a5dc603 100644 --- a/multiplexer/multiplexer_test.go +++ b/multiplexer/integration_test.go @@ -9,6 +9,7 @@ import ( "git.netflux.io/rob/termstream/mediaserver" "git.netflux.io/rob/termstream/multiplexer" "git.netflux.io/rob/termstream/testhelpers" + "github.com/docker/docker/client" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -20,7 +21,10 @@ func TestMultiplexer(t *testing.T) { t.Cleanup(cancel) logger := testhelpers.NewTestLogger() - containerClient, err := container.NewClient(ctx, logger) + apiClient, err := client.NewClientWithOpts(client.FromEnv) + require.NoError(t, err) + + containerClient, err := container.NewClient(ctx, apiClient, logger) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, containerClient.Close()) }) @@ -40,7 +44,7 @@ func TestMultiplexer(t *testing.T) { testhelpers.ChanDiscard(srv.C()) time.Sleep(2 * time.Second) - testhelpers.StreamFLV(t, srv.State().URL) + testhelpers.StreamFLV(t, srv.State().RTMPURL) require.Eventually( t, @@ -51,7 +55,7 @@ func TestMultiplexer(t *testing.T) { ) mp := multiplexer.NewActor(ctx, multiplexer.NewActorParams{ - SourceURL: srv.State().URL, + SourceURL: srv.State().RTMPInternalURL, ChanSize: 1, ContainerClient: containerClient, Logger: logger, @@ -61,16 +65,16 @@ func TestMultiplexer(t *testing.T) { requireListeners(t, srv, 0) - mp.ToggleDestination("rtmp://localhost:1936/destination/test1") - mp.ToggleDestination("rtmp://localhost:1936/destination/test2") - mp.ToggleDestination("rtmp://localhost:1936/destination/test3") + mp.ToggleDestination("rtmp://mediaserver:1935/destination/test1") + mp.ToggleDestination("rtmp://mediaserver:1935/destination/test2") + mp.ToggleDestination("rtmp://mediaserver:1935/destination/test3") requireListeners(t, srv, 3) - mp.ToggleDestination("rtmp://localhost:1936/destination/test3") + mp.ToggleDestination("rtmp://mediaserver:1935/destination/test3") requireListeners(t, srv, 2) - mp.ToggleDestination("rtmp://localhost:1936/destination/test2") - mp.ToggleDestination("rtmp://localhost:1936/destination/test1") + mp.ToggleDestination("rtmp://mediaserver:1935/destination/test2") + mp.ToggleDestination("rtmp://mediaserver:1935/destination/test1") requireListeners(t, srv, 0) } diff --git a/multiplexer/multiplexer.go b/multiplexer/multiplexer.go index be413f7..c1e7212 100644 --- a/multiplexer/multiplexer.go +++ b/multiplexer/multiplexer.go @@ -100,8 +100,9 @@ func (a *Actor) ToggleDestination(url string) { Labels: labels, }, HostConfig: &typescontainer.HostConfig{ - NetworkMode: "host", + NetworkMode: "default", }, + NetworkCountConfig: container.NetworkCountConfig{Rx: "eth1", Tx: "eth1"}, }) a.nextIndex++ diff --git a/terminal/actor.go b/terminal/actor.go index 404bd05..64d8b89 100644 --- a/terminal/actor.go +++ b/terminal/actor.go @@ -150,23 +150,29 @@ func (a *Actor) redrawFromState(state domain.AppState) { tableView.SetCell(0, 3, headerCell("[grey]Health", 2)) tableView.SetCell(0, 4, headerCell("[grey]CPU %", 1)) tableView.SetCell(0, 5, headerCell("[grey]Memory MB", 1)) - tableView.SetCell(0, 6, headerCell("[grey]Action", 2)) + tableView.SetCell(0, 6, headerCell("[grey]Rx Kbps", 1)) + tableView.SetCell(0, 7, headerCell("[grey]Tx Kbps", 1)) + tableView.SetCell(0, 8, headerCell("[grey]Action", 2)) } a.sourceView.Clear() setHeaderRow(a.sourceView) - a.sourceView.SetCell(1, 0, tview.NewTableCell(state.Source.URL)) + a.sourceView.SetCell(1, 0, tview.NewTableCell(state.Source.RTMPURL)) if state.Source.Live { a.sourceView.SetCell(1, 1, tview.NewTableCell("[black:green]receiving")) + } else if state.Source.Container.State == "running" && state.Source.Container.HealthState == "healthy" { + a.sourceView.SetCell(1, 1, tview.NewTableCell("[black:yellow]ready")) } else { - a.sourceView.SetCell(1, 1, tview.NewTableCell("[yellow]off-air")) + a.sourceView.SetCell(1, 1, tview.NewTableCell("[white:red]not ready")) } a.sourceView.SetCell(1, 2, tview.NewTableCell("[white]"+state.Source.Container.State)) a.sourceView.SetCell(1, 3, tview.NewTableCell("[white]"+cmp.Or(state.Source.Container.HealthState, "starting"))) a.sourceView.SetCell(1, 4, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", state.Source.Container.CPUPercent))) a.sourceView.SetCell(1, 5, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", float64(state.Source.Container.MemoryUsageBytes)/1024/1024))) - a.sourceView.SetCell(1, 6, tview.NewTableCell("")) + a.sourceView.SetCell(1, 6, tview.NewTableCell("[white]"+fmt.Sprintf("%d", state.Source.Container.RxRate))) + a.sourceView.SetCell(1, 7, tview.NewTableCell("[white]"+fmt.Sprintf("%d", state.Source.Container.TxRate))) + a.sourceView.SetCell(1, 8, tview.NewTableCell("")) a.destView.Clear() setHeaderRow(a.destView) @@ -202,15 +208,24 @@ func (a *Actor) redrawFromState(state domain.AppState) { if dest.Container.State == "running" { cpuPercent = fmt.Sprintf("%.1f", dest.Container.CPUPercent) } + a.destView.SetCell(i+1, 4, tview.NewTableCell("[white]"+cpuPercent)) memoryUsage := dash if dest.Container.State == "running" { memoryUsage = fmt.Sprintf("%.1f", float64(dest.Container.MemoryUsageBytes)/1024/1024) } - - a.destView.SetCell(i+1, 4, tview.NewTableCell("[white]"+cpuPercent)) a.destView.SetCell(i+1, 5, tview.NewTableCell("[white]"+memoryUsage)) - a.destView.SetCell(i+1, 6, tview.NewTableCell("[green]Tab to go live")) + + rxRate := dash + txRate := dash + if dest.Container.State == "running" { + rxRate = "[white]" + fmt.Sprintf("%d", dest.Container.RxRate) + txRate = "[white]" + fmt.Sprintf("%d", dest.Container.TxRate) + } + a.destView.SetCell(i+1, 6, tview.NewTableCell(rxRate)) + a.destView.SetCell(i+1, 7, tview.NewTableCell(txRate)) + + a.destView.SetCell(i+1, 8, tview.NewTableCell("[green]Tab to go live")) } a.app.Draw() diff --git a/testhelpers/docker.go b/testhelpers/docker.go new file mode 100644 index 0000000..977ecf0 --- /dev/null +++ b/testhelpers/docker.go @@ -0,0 +1,20 @@ +package testhelpers + +import ( + "context" + "io" + + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/client" +) + +// MockDockerClient is a mock docker client. +type MockDockerClient struct { + *client.Client + + ContainerStatsResponse io.ReadCloser +} + +func (c *MockDockerClient) ContainerStats(context.Context, string, bool) (container.StatsResponseReader, error) { + return container.StatsResponseReader{Body: c.ContainerStatsResponse}, nil +}