refactor(container): restart handling
This commit is contained in:
parent
f7f9843c4b
commit
7eff6c6065
@ -147,6 +147,11 @@ type CopyFileConfig struct {
|
|||||||
Mode int64
|
Mode int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ShouldRestartFunc is a callback function that is called when a container
|
||||||
|
// exits. It should return true if the container is to be restarted. If not
|
||||||
|
// restarting, err can be non-nil.
|
||||||
|
type ShouldRestartFunc func(exitCode int64, restartCount int, runningTime time.Duration) (bool, error)
|
||||||
|
|
||||||
// RunContainerParams are the parameters for running a container.
|
// RunContainerParams are the parameters for running a container.
|
||||||
type RunContainerParams struct {
|
type RunContainerParams struct {
|
||||||
Name string
|
Name string
|
||||||
@ -156,6 +161,7 @@ type RunContainerParams struct {
|
|||||||
NetworkingConfig *network.NetworkingConfig
|
NetworkingConfig *network.NetworkingConfig
|
||||||
NetworkCountConfig NetworkCountConfig
|
NetworkCountConfig NetworkCountConfig
|
||||||
CopyFileConfigs []CopyFileConfig
|
CopyFileConfigs []CopyFileConfig
|
||||||
|
ShouldRestart ShouldRestartFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunContainer runs a container with the given parameters.
|
// RunContainer runs a container with the given parameters.
|
||||||
@ -164,13 +170,18 @@ type RunContainerParams struct {
|
|||||||
// never be closed. The error channel will receive an error if the container
|
// never be closed. The error channel will receive an error if the container
|
||||||
// fails to start, and will be closed when the container exits, possibly after
|
// fails to start, and will be closed when the container exits, possibly after
|
||||||
// receiving an error.
|
// receiving an error.
|
||||||
|
//
|
||||||
|
// Panics if ShouldRestart is non-nil and the host config defines a restart
|
||||||
|
// policy of its own.
|
||||||
func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (<-chan domain.Container, <-chan error) {
|
func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (<-chan domain.Container, <-chan error) {
|
||||||
|
if params.ShouldRestart != nil && !params.HostConfig.RestartPolicy.IsNone() {
|
||||||
|
panic("shouldRestart and restart policy are mutually exclusive")
|
||||||
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
containerStateC := make(chan domain.Container, cmp.Or(params.ChanSize, defaultChanSize))
|
containerStateC := make(chan domain.Container, cmp.Or(params.ChanSize, defaultChanSize))
|
||||||
errC := make(chan error, 1)
|
errC := make(chan error, 1)
|
||||||
sendError := func(err error) {
|
sendError := func(err error) { errC <- err }
|
||||||
errC <- err
|
|
||||||
}
|
|
||||||
|
|
||||||
a.wg.Add(1)
|
a.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
@ -229,6 +240,7 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (<
|
|||||||
createResp.ID,
|
createResp.ID,
|
||||||
params.ContainerConfig.Image,
|
params.ContainerConfig.Image,
|
||||||
params.NetworkCountConfig,
|
params.NetworkCountConfig,
|
||||||
|
params.ShouldRestart,
|
||||||
containerStateC,
|
containerStateC,
|
||||||
errC,
|
errC,
|
||||||
)
|
)
|
||||||
@ -314,12 +326,16 @@ func (a *Client) runContainerLoop(
|
|||||||
containerID string,
|
containerID string,
|
||||||
imageName string,
|
imageName string,
|
||||||
networkCountConfig NetworkCountConfig,
|
networkCountConfig NetworkCountConfig,
|
||||||
|
shouldRestart ShouldRestartFunc,
|
||||||
stateC chan<- domain.Container,
|
stateC chan<- domain.Container,
|
||||||
errC chan<- error,
|
errC chan<- error,
|
||||||
) {
|
) {
|
||||||
type containerWaitResponse struct {
|
type containerWaitResponse struct {
|
||||||
container.WaitResponse
|
container.WaitResponse
|
||||||
|
|
||||||
restarting bool
|
restarting bool
|
||||||
|
restartCount int
|
||||||
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
containerRespC := make(chan containerWaitResponse)
|
containerRespC := make(chan containerWaitResponse)
|
||||||
@ -333,33 +349,54 @@ func (a *Client) runContainerLoop(
|
|||||||
// The goroutine exits when a value is received on the error channel, or when
|
// The goroutine exits when a value is received on the error channel, or when
|
||||||
// the container exits and is not restarting, or when the context is cancelled.
|
// the container exits and is not restarting, or when the context is cancelled.
|
||||||
go func() {
|
go func() {
|
||||||
|
const restartDuration = 5 * time.Second
|
||||||
|
timer := time.NewTimer(restartDuration)
|
||||||
|
defer timer.Stop()
|
||||||
|
timer.Stop()
|
||||||
|
var restartCount int
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
startedWaitingAt := time.Now()
|
||||||
respC, errC := a.apiClient.ContainerWait(ctx, containerID, container.WaitConditionNextExit)
|
respC, errC := a.apiClient.ContainerWait(ctx, containerID, container.WaitConditionNextExit)
|
||||||
select {
|
select {
|
||||||
case resp := <-respC:
|
case resp := <-respC:
|
||||||
var restarting bool
|
if shouldRestart != nil {
|
||||||
// Check if the container is restarting. If it is not then we don't
|
shouldRestart, err := shouldRestart(resp.StatusCode, restartCount, time.Since(startedWaitingAt))
|
||||||
// want to wait for it again and can return early.
|
if shouldRestart && err != nil {
|
||||||
ctr, err := a.apiClient.ContainerInspect(ctx, containerID)
|
panic(fmt.Errorf("shouldRestart must return nil error if restarting, but returned: %w", err))
|
||||||
// Race condition: the container may already have been removed.
|
}
|
||||||
if errdefs.IsNotFound(err) {
|
if !shouldRestart {
|
||||||
// ignore error but do not restart
|
a.logger.Info("Container exited", "id", shortID(containerID), "should_restart", "false", "exit_code", resp.StatusCode, "restart_count", restartCount)
|
||||||
} else if err != nil {
|
containerRespC <- containerWaitResponse{WaitResponse: resp, restarting: false, err: err}
|
||||||
a.logger.Error("Error inspecting container", "err", err, "id", shortID(containerID))
|
|
||||||
containerErrC <- err
|
|
||||||
return
|
return
|
||||||
// Race condition: the container may have already restarted.
|
}
|
||||||
} else if ctr.State.Status == domain.ContainerStatusRestarting || ctr.State.Status == domain.ContainerStatusRunning {
|
|
||||||
restarting = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
containerRespC <- containerWaitResponse{WaitResponse: resp, restarting: restarting}
|
restartCount++
|
||||||
if !restarting {
|
|
||||||
return
|
// TODO: exponential backoff
|
||||||
|
a.logger.Info("Container exited", "id", shortID(containerID), "should_restart", "true", "exit_code", resp.StatusCode, "restart_count", restartCount)
|
||||||
|
timer.Reset(restartDuration)
|
||||||
|
|
||||||
|
containerRespC <- containerWaitResponse{WaitResponse: resp, restarting: true}
|
||||||
|
case <-timer.C:
|
||||||
|
a.logger.Info("Container restarting", "id", shortID(containerID), "restart_count", restartCount)
|
||||||
|
if err := a.apiClient.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil {
|
||||||
|
containerErrC <- fmt.Errorf("container start: %w", err)
|
||||||
}
|
}
|
||||||
case err := <-errC:
|
case err := <-errC:
|
||||||
// Otherwise, this is probably unexpected and we need to handle it.
|
// If this is a not found error, the container has been removed -
|
||||||
|
// probably by the user. This is a bit hacky, and should be more
|
||||||
|
// explicit, possibly by signalling to this package that the container
|
||||||
|
// has been removed by the user instead of just calling
|
||||||
|
// ContainerRemove.
|
||||||
|
// TODO: improve this
|
||||||
|
if errdefs.IsNotFound(err) {
|
||||||
|
a.logger.Debug("Container not found when setting ContainerWait, ignoring", "id", shortID(containerID))
|
||||||
|
containerRespC <- containerWaitResponse{WaitResponse: container.WaitResponse{}, restarting: false}
|
||||||
|
} else {
|
||||||
containerErrC <- err
|
containerErrC <- err
|
||||||
|
}
|
||||||
return
|
return
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
containerErrC <- ctx.Err()
|
containerErrC <- ctx.Err()
|
||||||
@ -382,20 +419,23 @@ func (a *Client) runContainerLoop(
|
|||||||
a.logger.Info("Container entered non-running state", "exit_code", resp.StatusCode, "id", shortID(containerID), "restarting", resp.restarting)
|
a.logger.Info("Container entered non-running state", "exit_code", resp.StatusCode, "id", shortID(containerID), "restarting", resp.restarting)
|
||||||
|
|
||||||
var containerState string
|
var containerState string
|
||||||
|
var containerErr error
|
||||||
if resp.restarting {
|
if resp.restarting {
|
||||||
containerState = domain.ContainerStatusRestarting
|
containerState = domain.ContainerStatusRestarting
|
||||||
} else {
|
} else {
|
||||||
containerState = domain.ContainerStatusExited
|
containerState = domain.ContainerStatusExited
|
||||||
|
containerErr = resp.err
|
||||||
}
|
}
|
||||||
|
|
||||||
state.Status = containerState
|
state.Status = containerState
|
||||||
|
state.Err = containerErr
|
||||||
|
state.RestartCount = resp.restartCount
|
||||||
state.CPUPercent = 0
|
state.CPUPercent = 0
|
||||||
state.MemoryUsageBytes = 0
|
state.MemoryUsageBytes = 0
|
||||||
state.HealthState = "unhealthy"
|
state.HealthState = "unhealthy"
|
||||||
state.RxRate = 0
|
state.RxRate = 0
|
||||||
state.TxRate = 0
|
state.TxRate = 0
|
||||||
state.RxSince = time.Time{}
|
state.RxSince = time.Time{}
|
||||||
state.RestartCount++
|
|
||||||
|
|
||||||
if !resp.restarting {
|
if !resp.restarting {
|
||||||
exitCode := int(resp.StatusCode)
|
exitCode := int(resp.StatusCode)
|
||||||
@ -406,7 +446,7 @@ func (a *Client) runContainerLoop(
|
|||||||
|
|
||||||
sendState()
|
sendState()
|
||||||
case err := <-containerErrC:
|
case err := <-containerErrC:
|
||||||
// TODO: error handling?
|
// TODO: verify error handling
|
||||||
if err != context.Canceled {
|
if err != context.Canceled {
|
||||||
a.logger.Error("Error setting container wait", "err", err, "id", shortID(containerID))
|
a.logger.Error("Error setting container wait", "err", err, "id", shortID(containerID))
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package replicator
|
|||||||
import (
|
import (
|
||||||
"cmp"
|
"cmp"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -105,11 +106,20 @@ func (a *Actor) StartDestination(url string) {
|
|||||||
container.LabelURL: url,
|
container.LabelURL: url,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
HostConfig: &typescontainer.HostConfig{
|
HostConfig: &typescontainer.HostConfig{NetworkMode: "default"},
|
||||||
NetworkMode: "default",
|
|
||||||
RestartPolicy: typescontainer.RestartPolicy{Name: "always"},
|
|
||||||
},
|
|
||||||
NetworkCountConfig: container.NetworkCountConfig{Rx: "eth1", Tx: "eth0"},
|
NetworkCountConfig: container.NetworkCountConfig{Rx: "eth1", Tx: "eth0"},
|
||||||
|
ShouldRestart: func(_ int64, restartCount int, runningTime time.Duration) (bool, error) {
|
||||||
|
// Try to infer if the container failed to start.
|
||||||
|
//
|
||||||
|
// TODO: this is a bit hacky, we should check the container logs and
|
||||||
|
// include some details in the error message.
|
||||||
|
if restartCount == 0 && runningTime < 10*time.Second {
|
||||||
|
return false, errors.New("container failed to start")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, always restart, regardless of the exit code.
|
||||||
|
return true, nil
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
a.wg.Add(1)
|
a.wg.Add(1)
|
||||||
|
@ -366,8 +366,6 @@ func (ui *UI) ShowStartupCheckModal() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ui *UI) ShowDestinationErrorModal(name string, err error) {
|
func (ui *UI) ShowDestinationErrorModal(name string, err error) {
|
||||||
done := make(chan struct{})
|
|
||||||
|
|
||||||
ui.app.QueueUpdateDraw(func() {
|
ui.app.QueueUpdateDraw(func() {
|
||||||
ui.showModal(
|
ui.showModal(
|
||||||
pageNameModalDestinationError,
|
pageNameModalDestinationError,
|
||||||
@ -377,13 +375,9 @@ func (ui *UI) ShowDestinationErrorModal(name string, err error) {
|
|||||||
err,
|
err,
|
||||||
),
|
),
|
||||||
[]string{"Ok"},
|
[]string{"Ok"},
|
||||||
func(int, string) {
|
nil,
|
||||||
done <- struct{}{}
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
||||||
<-done
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShowFatalErrorModal displays the provided error. It sends a CommandQuit to the
|
// ShowFatalErrorModal displays the provided error. It sends a CommandQuit to the
|
||||||
|
Loading…
x
Reference in New Issue
Block a user