Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion deployment/dockerdeploy/deployer_clusterinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ type nodeInfoEx struct {
Status string
OTPNode string
Services []clusterdef.Service
IsClusterOrchestrator bool
ClusterNeedsRebalance bool
}

Expand All @@ -184,11 +185,21 @@ func (d *Deployer) getNodeInfoEx(ctx context.Context, nodeInfo *nodeInfo) (*node
Logger: d.logger,
Endpoint: fmt.Sprintf("http://%s:8091", nodeInfo.IPAddress),
}

thisNodeInfo, err := nodeCtrl.Controller().GetLocalInfo(ctx)
if err != nil {
// there are cases where we want to fetch extended cluster information while
// one of the nodes will not respond to this endpoint so we consider this non-fatal
d.logger.Info("failed to get extended node info, skipping",
zap.String("node", nodeInfo.Name), zap.Error(err))
return nodeEx, nil
}

terseClusterInfo, err := nodeCtrl.Controller().GetTerseClusterInfo(ctx)
if err != nil {
// there are cases where we want to fetch extended cluster information while
// one of the nodes will not respond to this endpoint so we consider this non-fatal
d.logger.Info("failed to get terse cluster info, skipping",
zap.String("node", nodeInfo.Name))
return nodeEx, nil
}
Expand All @@ -201,7 +212,8 @@ func (d *Deployer) getNodeInfoEx(ctx context.Context, nodeInfo *nodeInfo) (*node
nodeEx.Status = thisNodeInfo.Status
nodeEx.OTPNode = thisNodeInfo.OTPNode
nodeEx.Services = services
nodeEx.ClusterNeedsRebalance = thisNodeInfo.ClusterNeedsRebalance
nodeEx.ClusterNeedsRebalance = !terseClusterInfo.IsBalanced
nodeEx.IsClusterOrchestrator = (terseClusterInfo.Orchestrator == thisNodeInfo.OTPNode)

return nodeEx, nil
}
Expand Down
132 changes: 83 additions & 49 deletions deployment/dockerdeploy/deployer_rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,56 @@ func (d *Deployer) reconcileRebalance(
return d.reconcileRebalanceWithRetry(ctx, clusterInfoEx, otpsToRemove, lastAllowedRetryTime)
}

func (d *Deployer) checkClusterIsBalanced(
ctx context.Context,
clusterInfo *clusterInfo,
otpsToRemove []string,
) (bool, error) {
clusterInfoEx, err := d.getClusterInfoEx(ctx, clusterInfo)
if err != nil {
return false, errors.Wrap(err, "failed to get extended cluster info for rebalance")
}

numOrchestrators := 0
for _, node := range clusterInfoEx.NodesEx {
if node.IsClusterOrchestrator {
numOrchestrators++
}
}
if numOrchestrators != 1 {
// if nobody is advertising as being the orchestrator or more than one node is advertising
// as being the orchestrator then something went wrong, and we can just try the rebalance
// again and loop back around...
d.logger.Info("unexpected number of orchestrators after rebalance",
zap.Int("num_orchestrators", numOrchestrators))
return false, nil
}

for _, node := range clusterInfoEx.NodesEx {
if node.OTPNode == "" {
// no OTPNode info means it is probably not actually in the cluster
continue
}

if node.IsClusterOrchestrator && node.ClusterNeedsRebalance {
d.logger.Info("cluster still needs rebalance after rebalance operation")
return false, nil
}

if node.Status != "" && node.Status != "healthy" {
d.logger.Info("node unhealthy after rebalance", zap.String("node", node.OTPNode))
return false, nil
}

if slices.Contains(otpsToRemove, node.OTPNode) {
d.logger.Info("node unexpectedly still present after rebalance", zap.String("node", node.OTPNode))
return false, nil
}
}

return true, nil
}

func (d *Deployer) reconcileRebalanceWithRetry(
ctx context.Context,
clusterInfoEx *clusterInfoEx,
Expand Down Expand Up @@ -88,70 +138,54 @@ func (d *Deployer) reconcileRebalanceWithRetry(

d.logger.Info("validating post-rebalance state")

clusterInfoEx, err = d.getClusterInfoEx(ctx, clusterInfo)
clusterIsBalanced, err := d.checkClusterIsBalanced(ctx, clusterInfo, otpsToRemove)
if err != nil {
return errors.Wrap(err, "failed to get extended cluster info for rebalance")
return errors.Wrap(err, "failed to validate cluster state after rebalance")
}

rebalanceSuccess := true
if !clusterIsBalanced {
// if the cluster is not balanced immediately after the rebalance, we wait 15 seconds
// and check again to see if the state was just stale. If it still needs a rebalance,
// we trigger another rebalance to try and resolve the issue.
d.logger.Info("cluster not balanced after rebalance, waiting 15 seconds to re-validate")
time.Sleep(15 * time.Second)

for _, node := range clusterInfoEx.NodesEx {
if node.OTPNode == "" {
// no OTPNode info means it is probably not actually in the cluster
continue
clusterIsBalanced, err = d.checkClusterIsBalanced(ctx, clusterInfo, otpsToRemove)
if err != nil {
return errors.Wrap(err, "failed to validate cluster state after rebalance")
}

if node.ClusterNeedsRebalance {
d.logger.Info("cluster still needs rebalance after rebalance operation")
rebalanceSuccess = false
break
}
if !clusterIsBalanced {
allowedTimeLeft := time.Until(lastAllowedRetryTime)
d.logger.Info("cluster still not balanced, assuming failure and retrying", zap.Duration("time_left", allowedTimeLeft))

if node.Status != "" && node.Status != "healthy" {
d.logger.Info("node unhealthy after rebalance", zap.String("node", node.OTPNode))
rebalanceSuccess = false
break
}

if slices.Contains(otpsToRemove, node.OTPNode) {
d.logger.Info("node unexpectedly still present after rebalance", zap.String("node", node.OTPNode))
rebalanceSuccess = false
break
}
}

if !rebalanceSuccess {
allowedTimeLeft := time.Until(lastAllowedRetryTime)
d.logger.Info("rebalance did not complete successfully, retrying", zap.Duration("time_left", allowedTimeLeft))
allNodeOtps := make([]string, 0)
for _, clusterNode := range clusterInfoEx.NodesEx {
if !clusterNode.IsClusterNode() {
continue
}

allNodeOtps := make([]string, 0)
for _, clusterNode := range clusterInfoEx.NodesEx {
if !clusterNode.IsClusterNode() {
continue
}
if clusterNode.OTPNode == "" {
// no OTPNode info means its probably not actually in the cluster
continue
}

if clusterNode.OTPNode == "" {
// no OTPNode info means its probably not actually in the cluster
continue
allNodeOtps = append(allNodeOtps, clusterNode.OTPNode)
}

allNodeOtps = append(allNodeOtps, clusterNode.OTPNode)
}
// if we have any nodes to remove that are not actually in the cluster we skip them
var newOtpsToRemove []string
for _, otpToRemove := range otpsToRemove {
if !slices.Contains(allNodeOtps, otpToRemove) {
d.logger.Info("node to remove not found in actual cluster, skipping", zap.String("node", otpToRemove))
continue
}

// if we have any nodes to remove that are not actually in the cluster we skip them
var newOtpsToRemove []string
for _, otpToRemove := range otpsToRemove {
if !slices.Contains(allNodeOtps, otpToRemove) {
d.logger.Info("node to remove not found in actual cluster, skipping", zap.String("node", otpToRemove))
continue
newOtpsToRemove = append(newOtpsToRemove, otpToRemove)
}

newOtpsToRemove = append(newOtpsToRemove, otpToRemove)
return d.reconcileRebalanceWithRetry(ctx, clusterInfoEx, newOtpsToRemove, lastAllowedRetryTime)
}

// wait 5 seconds and then retry the rebalance
time.Sleep(5 * time.Second)
return d.reconcileRebalanceWithRetry(ctx, clusterInfoEx, newOtpsToRemove, lastAllowedRetryTime)
}

return nil
Expand Down
31 changes: 29 additions & 2 deletions utils/clustercontrol/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *Controller) doReq(ctx context.Context, req *http.Request, out interface

resp, err := client.Do(req)
if err != nil {
return errors.Wrap(err, "failed to execute request")
return errors.Wrap(err, "failed to do request")
}

defer resp.Body.Close()
Expand Down Expand Up @@ -100,7 +100,7 @@ func (c *Controller) doRetriableReq(ctx context.Context, makeReq func() (*http.R
return errors.Wrap(err, fmt.Sprintf("failed after %d retries", maxRetries))
}

c.Logger.Warn("request failed, retrying",
c.Logger.Debug("request failed, retrying",
zap.Int("retryNum", retryNum),
zap.Error(err),
)
Expand Down Expand Up @@ -497,6 +497,33 @@ func (c *Controller) GetLocalInfo(ctx context.Context) (*LocalInfo, error) {
return nil, errors.New("no node was marked as this node")
}

type TerseClusterInfo struct {
ClusterUUID string
Orchestrator string
IsBalanced bool
ClusterCompatVersion string
}

func (c *Controller) GetTerseClusterInfo(ctx context.Context) (*TerseClusterInfo, error) {
var resp struct {
ClusterUUID string `json:"clusterUUID"`
Orchestrator string `json:"orchestrator"`
IsBalanced bool `json:"isBalanced"`
ClusterCompatVersion string `json:"clusterCompatVersion"`
}
err := c.doGet(ctx, "/pools/default/terseClusterInfo", &resp)
if err != nil {
return nil, err
}

return &TerseClusterInfo{
ClusterUUID: resp.ClusterUUID,
Orchestrator: resp.Orchestrator,
IsBalanced: resp.IsBalanced,
ClusterCompatVersion: resp.ClusterCompatVersion,
}, nil
}

func (c *Controller) ListNodeOTPs(ctx context.Context) ([]string, error) {
var resp struct {
Nodes []struct {
Expand Down