Skip to content

Commit

Permalink
完善优化腾讯自研云支持大批量手动上架节点-分批添加
Browse files Browse the repository at this point in the history
  • Loading branch information
dove0012 committed Nov 27, 2024
1 parent fd02489 commit 754e339
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ func CheckCvmInstanceState(ctx context.Context, ids []string,
)

idChunks := utils.SplitStringsChunks(ids, common.ClusterAddNodesLimit)
for _, v := range idChunks {
ins, err := checkCvmInstance(ctx, client, v, taskId)
for _, ids := range idChunks {
ins, err := checkCvmInstance(ctx, client, ids, taskId)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -944,8 +944,8 @@ func CheckClusterDeletedNodes(ctx context.Context, info *cloudprovider.CloudDepe
func CheckClusterInstanceStatus(ctx context.Context, info *cloudprovider.CloudDependBasicInfo, // nolint
instanceIDs []string) ([]string, []string, error) {
var (
addSuccessNodes = make([]string, 0)
addFailureNodes = make([]string, 0)
allAddSuccessNodes = make([]string, 0)
allAddFailureNodes = make([]string, 0)
)

taskID, stepName := cloudprovider.GetTaskIDAndStepNameFromContext(ctx)
Expand All @@ -957,88 +957,100 @@ func CheckClusterInstanceStatus(ctx context.Context, info *cloudprovider.CloudDe
return nil, nil, err
}

// wait node group state to normal
timeCtx, cancel := context.WithTimeout(context.TODO(), 12*time.Minute)
defer cancel()
idChunks := utils.SplitStringsChunks(instanceIDs, common.ClusterAddNodesLimit)
for _, ids := range idChunks {
var (
addSuccessNodes = make([]string, 0)
addFailureNodes = make([]string, 0)
)

// wait all nodes to be ready
err = loop.LoopDoFunc(timeCtx, func() error {
instances, errQuery := cli.QueryTkeClusterInstances(&api.DescribeClusterInstances{
ClusterID: info.Cluster.SystemID,
InstanceIDs: instanceIDs,
})
if errQuery != nil {
blog.Errorf("checkClusterInstanceStatus[%s] QueryTkeClusterInstances failed: %v", taskID, errQuery)
return nil
}
// wait node group state to normal
timeCtx, cancel := context.WithTimeout(context.TODO(), 12*time.Minute)
defer cancel()

index := 0
running, failure := make([]string, 0), make([]string, 0)
for _, ins := range instances {
blog.Infof("checkClusterInstanceStatus[%s] instance[%s] status[%s]", taskID, *ins.InstanceId, *ins.InstanceState)
switch *ins.InstanceState {
case api.RunningInstanceTke.String():
running = append(running, *ins.InstanceId)
index++
case api.FailedInstanceTke.String():
failure = append(failure, *ins.InstanceId)
index++
default:
// wait all nodes to be ready
err = loop.LoopDoFunc(timeCtx, func() error {
instances, errQuery := cli.QueryTkeClusterInstances(&api.DescribeClusterInstances{
ClusterID: info.Cluster.SystemID,
InstanceIDs: ids,
})
if errQuery != nil {
blog.Errorf("checkClusterInstanceStatus[%s] QueryTkeClusterInstances failed: %v", taskID, errQuery)
return nil
}
}

if index == len(instanceIDs) {
addSuccessNodes = running
addFailureNodes = failure
return loop.EndLoop
}
index := 0
running, failure := make([]string, 0), make([]string, 0)
for _, ins := range instances {
blog.Infof("checkClusterInstanceStatus[%s] instance[%s] status[%s]", taskID, *ins.InstanceId, *ins.InstanceState)
switch *ins.InstanceState {
case api.RunningInstanceTke.String():
running = append(running, *ins.InstanceId)
index++
case api.FailedInstanceTke.String():
failure = append(failure, *ins.InstanceId)
index++
default:
}
}

return nil
}, loop.LoopInterval(20*time.Second))
// other error
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
blog.Errorf("checkClusterInstanceStatus[%s] QueryTkeClusterInstances failed: %v", taskID, err)
return nil, nil, err
}
// timeout error
if errors.Is(err, context.DeadlineExceeded) {
blog.Errorf("checkClusterInstanceStatus[%s] QueryTkeClusterInstances failed: %v", taskID, err)
if index == len(ids) {
addSuccessNodes = running
addFailureNodes = failure
return loop.EndLoop
}

running, failure := make([]string, 0), make([]string, 0)
instances, errQuery := cli.QueryTkeClusterInstances(&api.DescribeClusterInstances{
ClusterID: info.Cluster.SystemID,
InstanceIDs: instanceIDs,
})
if errQuery != nil {
blog.Errorf("checkClusterInstanceStatus[%s] QueryTkeClusterInstances failed: %v", taskID, errQuery)
return nil, nil, errQuery
return nil
}, loop.LoopInterval(20*time.Second))
// other error
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
blog.Errorf("checkClusterInstanceStatus[%s] QueryTkeClusterInstances failed: %v", taskID, err)
return nil, nil, err
}
for _, ins := range instances {
blog.Infof("checkClusterInstanceStatus[%s] instance[%s] status[%s]", taskID, *ins.InstanceId, *ins.InstanceState)
switch *ins.InstanceState {
case api.RunningInstanceTke.String():
running = append(running, *ins.InstanceId)
default:
failure = append(failure, *ins.InstanceId)
// timeout error
if errors.Is(err, context.DeadlineExceeded) {
blog.Errorf("checkClusterInstanceStatus[%s] QueryTkeClusterInstances failed: %v", taskID, err)

running, failure := make([]string, 0), make([]string, 0)
instances, errQuery := cli.QueryTkeClusterInstances(&api.DescribeClusterInstances{
ClusterID: info.Cluster.SystemID,
InstanceIDs: ids,
})
if errQuery != nil {
blog.Errorf("checkClusterInstanceStatus[%s] QueryTkeClusterInstances failed: %v", taskID, errQuery)
return nil, nil, errQuery
}
for _, ins := range instances {
blog.Infof("checkClusterInstanceStatus[%s] instance[%s] status[%s]", taskID, *ins.InstanceId, *ins.InstanceState)
switch *ins.InstanceState {
case api.RunningInstanceTke.String():
running = append(running, *ins.InstanceId)
default:
failure = append(failure, *ins.InstanceId)
}
}
addSuccessNodes = running
addFailureNodes = failure
}
addSuccessNodes = running
addFailureNodes = failure

allAddSuccessNodes = append(allAddSuccessNodes, addSuccessNodes...)
allAddFailureNodes = append(allAddFailureNodes, addFailureNodes...)

blog.Infof("checkClusterInstanceStatus[%s] success[%v] failure[%v]", taskID, addSuccessNodes, addFailureNodes)
}
blog.Infof("checkClusterInstanceStatus[%s] success[%v] failure[%v]", taskID, addSuccessNodes, addFailureNodes)

cloudprovider.GetStorageModel().CreateTaskStepLogInfo(context.Background(), taskID, stepName,
fmt.Sprintf("success [%v] failure [%v]", addSuccessNodes, addFailureNodes))
fmt.Sprintf("success [%v] failure [%v]", allAddSuccessNodes, allAddFailureNodes))

// set cluster node status
for _, n := range addFailureNodes {
for _, n := range allAddFailureNodes {
err = cloudprovider.UpdateNodeStatus(false, n, common.StatusAddNodesFailed)
if err != nil {
blog.Errorf("checkClusterInstanceStatus[%s] UpdateNodeStatusByInstanceID[%s] failed: %v", taskID, n, err)
}
}

return addSuccessNodes, addFailureNodes, nil
return allAddSuccessNodes, allAddFailureNodes, nil
}

// GetFailedNodesReason get add nodes failed reason
Expand Down

0 comments on commit 754e339

Please sign in to comment.