Skip to content

Commit

Permalink
Merge pull request TencentBlueKing#8072 from tbs60/master
Browse files Browse the repository at this point in the history
bug: 多k8s集群支持问题修复 TencentBlueKing#8046
  • Loading branch information
tming authored Dec 1, 2022
2 parents 4ed1a42 + 7466231 commit 2d0f59e
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/backend/booster/server/pkg/api/v2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func getTaskInfo(taskID string) (*RespTaskInfo, error) {
if tb.Status.Status == engine.TaskStatusStaging {
rank, err = defaultManager.GetTaskRank(taskID)
if err != nil {
blog.Errorf("get apply param: get task(q%s) rank from engine(%s) queue(%s) failed: %v",
blog.Warnf("get apply param: get task(%s) rank from engine(%s) queue(%s) failed: %v",
taskID, tb.Client.EngineName.String(), tb.Client.QueueName, err)
rank = 0
}
Expand Down
11 changes: 10 additions & 1 deletion src/backend/booster/server/pkg/manager/normal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (m *manager) GetTaskRank(taskID string) (int, error) {

rank, err := qg.GetQueue(tb.Client.QueueName).Rank(taskID)
if err != nil {
blog.Errorf("manager: try getting task rank, get task(%s) rank from engine(%s) queue(%s) failed: %v",
blog.Warnf("manager: try getting task rank, get task(%s) rank from engine(%s) queue(%s) failed: %v",
taskID, tb.Client.EngineName, tb.Client.QueueName, err)
return -1, err
}
Expand Down Expand Up @@ -632,6 +632,15 @@ func generateTaskID(egnName string, projectID string) string {
taskIDFormat, egnName, projectID, time.Now().Unix(), strings.ToLower(util.RandomString(taskIDRandomLength)))
}

//IsOldTaskType check if the task id type is old
func IsOldTaskType(id string) bool {
idx := strings.LastIndex(id, "-")
if idx == len(id)-taskIDRandomLength-1 { //old task Id
return true
}
return false
}

func ip2long(ipStr string) (uint32, error) {
ip := net.ParseIP(ipStr)
if ip == nil {
Expand Down
31 changes: 21 additions & 10 deletions src/backend/booster/server/pkg/resource/crm/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
commonMySQL "github.com/Tencent/bk-ci/src/booster/common/mysql"
"github.com/Tencent/bk-ci/src/booster/server/config"
"github.com/Tencent/bk-ci/src/booster/server/pkg/engine"
"github.com/Tencent/bk-ci/src/booster/server/pkg/manager/normal"
rsc "github.com/Tencent/bk-ci/src/booster/server/pkg/resource"
op "github.com/Tencent/bk-ci/src/booster/server/pkg/resource/crm/operator"
"github.com/Tencent/bk-ci/src/booster/server/pkg/resource/crm/operator/k8s"
Expand All @@ -32,7 +33,7 @@ import (
)

//DefaultNamespace define default namespace for resourcemanager
const DefaultNamespace = "disttask"
//const DefaultNamespace = "disttask"

// NewResourceManager get a new container resource manager.
func NewResourceManager(
Expand Down Expand Up @@ -403,12 +404,22 @@ func (rm *resourceManager) recover() error {

rm.registeredResourceMap = make(map[string]*resource, 1000)
for _, r := range rl {
if rm.conf.Operator == config.CRMOperatorK8S && rm.conf.InstanceType != nil {
isBelongToRm := false
for _, ist := range rm.conf.InstanceType {
if ist.Group == r.param.City && ist.Platform == r.param.Platform {
isBelongToRm = true
}
}
if !isBelongToRm {
continue
}
}
rm.registeredResourceMap[r.resourceID] = r

if r.noReadyInstance <= 0 {
continue
}

// recover the no-ready records
rm.nodeInfoPool.RecoverNoReadyBlock(r.resourceBlockKey, r.noReadyInstance)
blog.Infof("crm: recover no-ready-instance(%d) from resource(%s)", r.noReadyInstance, r.resourceID)
Expand All @@ -418,13 +429,10 @@ func (rm *resourceManager) recover() error {
}

func (rm *resourceManager) sync() {
if rm.conf.BcsNamespace == "" {
rm.conf.BcsNamespace = DefaultNamespace
}

nodeInfoList, err := rm.operator.GetResource(rm.conf.BcsClusterID)
if err != nil {
blog.Errorf("crm: sync resource failed: %v", err)
blog.Errorf("crm: sync resource failed, clusterId(%s): %v", rm.conf.BcsClusterID, err)
return
}

Expand Down Expand Up @@ -670,8 +678,8 @@ func (rm *resourceManager) getServiceInfo(resourceID, user string) (*op.ServiceI
}
info, err := rm.operator.GetServerStatus(rm.conf.BcsClusterID, rm.handlerMap[user].GetNamespace(), targetID)
if err != nil {
blog.Errorf("crm: get service info for resource(%s) target(%s) user(%s) failed: %v",
resourceID, targetID, user, err)
blog.Errorf("crm: get service info for resource(%s) target(%s) user(%s) namespace(%s) failed: %v",
resourceID, targetID, user, rm.handlerMap[user].GetNamespace(), err)
return nil, err
}

Expand Down Expand Up @@ -949,7 +957,7 @@ func (rm *resourceManager) release(resourceID, user string) error {

r, err := rm.getResources(resourceID)
if err != nil {
blog.Errorf("crm: try releasing service, get resource(%s) for user(%s) failed: %v",
blog.Warnf("crm: try releasing service, get resource(%s) for user(%s) failed: %v",
resourceID, user, err)
return err
}
Expand Down Expand Up @@ -1151,10 +1159,13 @@ func (hwu *handlerWithUser) GetNamespace() string {
if hwu.mgr.conf.BcsNamespace != "" {
return hwu.mgr.conf.BcsNamespace
}
return DefaultNamespace
return hwu.user
}

func (hwu *handlerWithUser) resourceID(id string) string {
if normal.IsOldTaskType(id) { //old task Id
return strings.ReplaceAll(strings.ToLower(fmt.Sprintf("%s-%s", hwu.user, id)), "_", "-")
}
return strings.ReplaceAll(strings.ToLower(id), "_", "-")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,10 @@ func (o *operator) getFederationTotalNum(url string, ist config.InstanceType) (F

func (o *operator) getFederationResource(clusterID string) ([]*op.NodeInfo, error) {
nodeInfoList := make([]*op.NodeInfo, 0, 1000)
ns := o.conf.BcsNamespace
url := fmt.Sprintf(bcsAPIFederatedURI, o.conf.BcsAPIPool.GetAddress(), clusterID, ns)
if o.conf.BcsNamespace == "" {
return nil, fmt.Errorf("crm: get federation resource request failed clusterID(%s): namespace is nil", clusterID)
}
url := fmt.Sprintf(bcsAPIFederatedURI, o.conf.BcsAPIPool.GetAddress(), clusterID, o.conf.BcsNamespace)
for _, ist := range o.conf.InstanceType {
result, err := o.getFederationTotalNum(url, ist)
if err != nil {
Expand All @@ -379,8 +381,8 @@ func (o *operator) getFederationResource(clusterID string) ([]*op.NodeInfo, erro
}
totalIst := float64(result.Data.Total)
nodeInfoList = append(nodeInfoList, &op.NodeInfo{
IP: clusterID + "-" + ns + "-" + ist.Platform + "-" + ist.Group,
Hostname: clusterID + "-" + ns + "-" + ist.Platform + "-" + ist.Group,
IP: clusterID + "-" + o.conf.BcsNamespace + "-" + ist.Platform + "-" + ist.Group,
Hostname: clusterID + "-" + o.conf.BcsNamespace + "-" + ist.Platform + "-" + ist.Group,
DiskLeft: totalIst,
MemLeft: totalIst * ist.MemPerInstance,
CPULeft: totalIst * ist.CPUPerInstance,
Expand All @@ -399,12 +401,14 @@ func (o *operator) getServerStatus(clusterID, namespace, name string) (*op.Servi
info := &op.ServiceInfo{}

if err := o.getDeployments(clusterID, namespace, name, info); err != nil {
blog.Errorf("k8s-operator: get server status, get deployments failed: %v", err)
blog.Errorf("k8s-operator: get server status, get deployments clusterID(%s) namespace(%s) failed: %v",
clusterID, namespace, err)
return nil, err
}

if err := o.getPods(clusterID, namespace, name, info); err != nil {
blog.Errorf("k8s-operator: get server status, get pods failed: %v", err)
blog.Errorf("k8s-operator: get server status, get pods clusterID(%s) namespace(%s) failed: %v",
clusterID, namespace, err)
return nil, err
}

Expand Down Expand Up @@ -712,10 +716,12 @@ func (o *operator) getClientSetFromCache(clusterID string) (*clusterClientSet, b

func (o *operator) generateClient(clusterID string) (*clusterClientSet, error) {
address := o.conf.BcsAPIPool.GetAddress()
var host string
if o.conf.EnableBCSApiGw {
EnableBCSApiGw = "1"
host = fmt.Sprintf(bcsAPIGWK8SBaseURI, address, clusterID)
} else {
host = fmt.Sprintf(bcsAPIK8SBaseURI, address, clusterID)
}
host := fmt.Sprintf(getBcsK8SBaseURL(), address, clusterID)

blog.Infof("k8s-operator: try generate client with host(%s) token(%s)", host, o.conf.BcsAPIToken)
// get client set by real api-server address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,14 @@ func (o *operator) getServerStatus(clusterID, namespace, name string) (*op.Servi
info := &op.ServiceInfo{}

if err := o.getApplication(clusterID, namespace, name, info); err != nil {
blog.Errorf("get server status, get application failed: %v", err)
blog.Errorf("get server status, clusterId(%s), ns(%s), name(%s) get application failed: %v",
clusterID, namespace, name, err)
return nil, err
}

if err := o.getTaskGroup(clusterID, namespace, name, info); err != nil {
blog.Errorf("get server status, get taskGroup failed: %v", err)
blog.Errorf("get server status, clusterId(%s), ns(%s), name(%s) get taskGroup failed: %v",
clusterID, namespace, name, err)
return nil, err
}

Expand Down
13 changes: 7 additions & 6 deletions src/backend/booster/server/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func (s *Server) initK8sResourceManagers() (k8sRm crm.ResourceManager,
curQueueMap := make(map[string]bool)
k8sRmList = make(map[string]crm.ResourceManager)
k8sconfList := s.conf.K8sResourceConfigList
if k8sconfList.K8sClusterList != nil {
if k8sconfList.Enable && k8sconfList.K8sClusterList != nil {
for key, confItem := range k8sconfList.K8sClusterList {
if confItem.MySQLStorage == "" {
confItem.MySQLStorage = k8sconfList.MySQLStorage
Expand Down Expand Up @@ -452,13 +452,14 @@ func (s *Server) initK8sResourceManagers() (k8sRm crm.ResourceManager,
}

k8sConf := s.conf.K8sContainerResourceConfig
for queueName, istItem := range k8sQueueIstList {
if !curQueueMap[queueName] {
initInstanceType(&k8sConf, istItem)
if k8sConf.Enable {
for queueName, istItem := range k8sQueueIstList {
if !curQueueMap[queueName] {
initInstanceType(&k8sConf, istItem)
}
}
k8sRm, err = s.initContainerResourceManager(&k8sConf, s.rd)
}

k8sRm, err = s.initContainerResourceManager(&k8sConf, s.rd)
return
}

Expand Down

0 comments on commit 2d0f59e

Please sign in to comment.