Skip to content

Commit

Permalink
feat: kubeconfig和云帐号导入集群优化
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyudeqiu committed Nov 14, 2024
1 parent 4029c0a commit 60043c0
Show file tree
Hide file tree
Showing 9 changed files with 5 additions and 448 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,7 @@ import (
"time"

"github.com/Tencent/bk-bcs/bcs-common/common/blog"
k8scorev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

proto "github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/api/clustermanager"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/internal/cloudprovider"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/internal/common"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/internal/remote/encrypt"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/internal/utils"
)

// RegisterClusterKubeConfigTask register cluster kubeConfig connection
Expand Down Expand Up @@ -103,15 +94,6 @@ func ImportClusterNodesTask(taskID string, stepName string) error {
return retErr
}

// import cluster instances
err = importClusterInstances(basicInfo)
if err != nil {
blog.Errorf("ImportClusterNodesTask[%s]: importClusterInstances failed: %v", taskID, err)
retErr := fmt.Errorf("importClusterInstances failed, %s", err.Error())
_ = state.UpdateStepFailure(start, stepName, retErr)
return retErr
}

// update cluster masterNodes info
err = cloudprovider.GetStorageModel().UpdateCluster(context.Background(), basicInfo.Cluster)
if err != nil {
Expand All @@ -127,54 +109,3 @@ func ImportClusterNodesTask(taskID string, stepName string) error {

return nil
}

func importClusterInstances(data *cloudprovider.CloudDependBasicInfo) error {
kubeConfig, err := encrypt.Decrypt(nil, data.Cluster.KubeConfig)
if err != nil {
return fmt.Errorf("decode kube config failed: %v", err)
}
kubeConfigByte := []byte(kubeConfig)

config, err := clientcmd.RESTConfigFromKubeConfig(kubeConfigByte)
if err != nil {
return fmt.Errorf("build rest config failed: %v", err)
}

config.Burst = 200
config.QPS = 100
kubeCli, err := kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("build kube client failed: %s", err)
}

nodes, err := kubeCli.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("list nodes failed, %s", err.Error())
}

err = importClusterNodesToCM(context.Background(), nodes.Items, data.Cluster.ClusterID)
if err != nil {
return err
}

return nil
}

func importClusterNodesToCM(ctx context.Context, nodes []k8scorev1.Node, clusterID string) error {
for i := range nodes {
ipv4, ipv6 := utils.GetNodeIPAddress(&nodes[i])
node := &proto.Node{
InnerIP: utils.SliceToString(ipv4),
InnerIPv6: utils.SliceToString(ipv6),
Status: common.StatusRunning,
NodeName: nodes[i].Name,
ClusterID: clusterID,
}
err := cloudprovider.GetStorageModel().CreateNode(ctx, node)
if err != nil {
blog.Errorf("ImportClusterNodesToCM CreateNode[%s] failed: %v", nodes[i].Name, err)
}
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,6 @@ func importClusterInstances(data *cloudprovider.CloudDependBasicInfo) error {
}
}

err = importClusterNodesToCM(context.Background(), nodes.Items, data.Cluster.ClusterID)
if err != nil {
return err
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,59 +14,19 @@ package tasks

import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute"
"github.com/Tencent/bk-bcs/bcs-common/common/blog"
"github.com/Tencent/bk-bcs/bcs-common/pkg/odm/drivers"
k8scorev1 "k8s.io/api/core/v1"

proto "github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/api/clustermanager"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/internal/cloudprovider"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/internal/cloudprovider/azure/api"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/internal/common"
)

func importClusterNodesToCM(ctx context.Context, nodes []k8scorev1.Node, clusterID string) error {
for _, n := range nodes {
innerIP := ""
for _, v := range n.Status.Addresses {
if v.Type == k8scorev1.NodeInternalIP {
innerIP = v.Address
break
}
}
if innerIP == "" {
continue
}
node, err := cloudprovider.GetStorageModel().GetNodeByIP(ctx, innerIP)
if err != nil && !errors.Is(err, drivers.ErrTableRecordNotFound) {
blog.Errorf("importClusterNodes GetNodeByIP[%s] failed: %v", innerIP, err)
// no import node when found err
continue
}

if node == nil {
node = &proto.Node{
InnerIP: innerIP,
Status: common.StatusRunning,
ClusterID: clusterID,
}
err = cloudprovider.GetStorageModel().CreateNode(ctx, node)
if err != nil {
blog.Errorf("importClusterNodes CreateNode[%s] failed: %v", innerIP, err)
}
continue
}
}

return nil
}

func setModuleInfo(group *proto.NodeGroup, bkBizIDString string) {
if group.NodeTemplate != nil && group.NodeTemplate.Module != nil &&
len(group.NodeTemplate.Module.ScaleOutModuleID) != 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ func importClusterInstances(data *cloudprovider.CloudDependBasicInfo) error {
data.Cluster.Master = masterNodes
// data.Cluster.Status = icommon.StatusRunning

// 导入方式是 kubeconfig 集群节点不写入数据库
if data.Cluster.ClusterCategory == icommon.Importer && data.Cluster.ImportCategory == icommon.KubeConfigImport {
return nil
}

err = importClusterNodesToCM(context.Background(), nodeIPs, &cloudprovider.ListNodesOption{
Common: data.CmOption,
ClusterVPCID: data.Cluster.VpcID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,17 @@ package tasks

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/Tencent/bk-bcs/bcs-common/common/blog"
k8scorev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

proto "github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/api/clustermanager"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/internal/cloudprovider"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/internal/cloudprovider/google/api"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/internal/clusterops"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/internal/common"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/internal/remote/encrypt"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/internal/types"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cluster-manager/internal/utils"
)

// ImportClusterNodesTask call gkeInterface or kubeConfig import cluster nodes
Expand Down Expand Up @@ -61,15 +54,6 @@ func ImportClusterNodesTask(taskID string, stepName string) error {
return retErr
}

// import cluster instances
err = importClusterInstances(basicInfo)
if err != nil {
blog.Errorf("ImportClusterNodesTask[%s]: importClusterInstances failed: %v", taskID, err)
retErr := fmt.Errorf("importClusterInstances failed, %s", err.Error())
_ = state.UpdateStepFailure(start, stepName, retErr)
return retErr
}

// update cluster masterNodes info
_ = cloudprovider.GetStorageModel().UpdateCluster(context.Background(), basicInfo.Cluster)

Expand Down Expand Up @@ -161,91 +145,3 @@ func importClusterCredential(ctx context.Context, data *cloudprovider.CloudDepen

return nil
}

func importClusterInstances(data *cloudprovider.CloudDependBasicInfo) error {
config, _ := encrypt.Decrypt(nil, data.Cluster.KubeConfig)
kubeRet := base64.StdEncoding.EncodeToString([]byte(config))

kubeCli, err := clusterops.NewKubeClient(kubeRet)
if err != nil {
return fmt.Errorf("importClusterInstances NewKubeClient failed: %v", err)
}

nodes, err := kubeCli.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("list nodes failed, %s", err.Error())
}

// get container runtime info here due to GKE API is not support
if len(nodes.Items) > 0 {
crv := strings.Split(nodes.Items[0].Status.NodeInfo.ContainerRuntimeVersion, "://")
if len(crv) == 2 {
data.Cluster.ClusterAdvanceSettings = &proto.ClusterAdvanceSetting{
ContainerRuntime: crv[0],
RuntimeVersion: crv[1],
}
err = cloudprovider.GetStorageModel().UpdateCluster(context.Background(), data.Cluster)
if err != nil {
blog.Errorf("importClusterInstances update cluster[%s] failed: %v", data.Cluster.ClusterName, err)
}
}
}

gceCli, err := api.NewComputeServiceClient(data.CmOption)
if err != nil {
return fmt.Errorf("get gce client failed, %s", err.Error())
}

err = importClusterNodesToCM(context.Background(), gceCli, nodes.Items, data.Cluster.ClusterID)
if err != nil {
return err
}

return nil
}

// ImportClusterNodesToCM writes cluster nodes to DB
func importClusterNodesToCM(
ctx context.Context, gceCli *api.ComputeServiceClient, nodes []k8scorev1.Node, clusterID string) error {

for _, v := range nodes {
nodeZone := ""
zone, ok := v.Labels[utils.ZoneKubernetesFlag]
if ok {
nodeZone = zone
}
zone, ok = v.Labels[utils.ZoneTopologyFlag]
if ok && nodeZone == "" {
nodeZone = zone
}

var (
node = &proto.Node{}
)

instance, err := gceCli.GetInstance(ctx, nodeZone, v.Name)
if err == nil {
node = api.InstanceToNode(gceCli, instance)
} else {
blog.Errorf("ImportClusterNodesToCM failed: %v", err)
node.Region = v.Labels[utils.RegionTopologyFlag]
node.InstanceType = v.Labels[utils.NodeInstanceTypeFlag]
node.NodeName = v.Labels[utils.NodeNameFlag]
}

ipv4, ipv6 := utils.GetNodeIPAddress(&v)
node.ZoneName = nodeZone
node.InnerIP = utils.SliceToString(ipv4)
node.InnerIPv6 = utils.SliceToString(ipv6)
node.ClusterID = clusterID
node.Status = common.StatusRunning

err = cloudprovider.GetStorageModel().CreateNode(ctx, node)
if err != nil {
blog.Errorf("ImportClusterNodesToCM CreateNode[%s] failed: %v", v.Name, err)
continue
}
}

return nil
}
Loading

0 comments on commit 60043c0

Please sign in to comment.