Skip to content

Commit

Permalink
GROUNDWORK-2556 k8s: process collecting errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavlo Sumkin committed Oct 12, 2023
1 parent 3dbd724 commit 7dfacf8
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 27 deletions.
85 changes: 60 additions & 25 deletions connectors/kubernetes-connector/kubernetesConnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,19 +231,34 @@ func (connector *KubernetesConnector) Ping() error {
return nil
}

// Collect inventory and metrics for all kinds of Kubernetes resources. Sort resources into groups and return inventory of host resources and inventory of groups
func (connector *KubernetesConnector) Collect() ([]transit.InventoryResource, []transit.MonitoredResource, []transit.ResourceGroup) {
// Collect inventory and metrics for all kinds of Kubernetes resources.
// Sort resources into groups and return inventory of host resources and inventory of groups
func (connector *KubernetesConnector) Collect() (
[]transit.InventoryResource,
[]transit.MonitoredResource,
[]transit.ResourceGroup,
error) {
// gather inventory and Metrics
metricsPerContainer := true
monitoredState := make(map[string]KubernetesResource)
groups := make(map[string]transit.ResourceGroup)
connector.collectNodeInventory(monitoredState, groups)
connector.collectPodInventory(monitoredState, groups, &metricsPerContainer)
connector.collectNodeMetrics(monitoredState)
if err := connector.collectNodeInventory(monitoredState, groups); err != nil {
return nil, nil, nil, err
}
if err := connector.collectPodInventory(monitoredState, groups, &metricsPerContainer); err != nil {
return nil, nil, nil, err
}
if err := connector.collectNodeMetrics(monitoredState); err != nil {
return nil, nil, nil, err
}
if metricsPerContainer {
connector.collectPodMetricsPerContainer(monitoredState)
if err := connector.collectPodMetricsPerContainer(monitoredState); err != nil {
return nil, nil, nil, err
}
} else {
connector.collectPodMetricsPerReplica(monitoredState)
if err := connector.collectPodMetricsPerReplica(monitoredState); err != nil {
return nil, nil, nil, err
}
}

// convert to arrays as expected by TCG
Expand Down Expand Up @@ -287,7 +302,7 @@ func (connector *KubernetesConnector) Collect() ([]transit.InventoryResource, []
slices.SortFunc(hostGroups, func(a, b transit.ResourceGroup) int { return cmp.Compare(a.GroupName, b.GroupName) })
slices.SortFunc(inventory, func(a, b transit.InventoryResource) int { return cmp.Compare(a.Name, b.Name) })

return inventory, monitored, hostGroups
return inventory, monitored, hostGroups, nil
}

// Node Inventory also retrieves status, capacity, and allocations
Expand All @@ -303,8 +318,16 @@ func (connector *KubernetesConnector) Collect() ([]transit.InventoryResource, []
// (v1.ResourceName) (len=4) pods: (resource.Quantity) 17,
// (v1.ResourceName) (len=3) cpu: (resource.Quantity) 1930m
// (v1.ResourceName) (len=17) ephemeral-storage: (resource.Quantity) 18242267924,
func (connector *KubernetesConnector) collectNodeInventory(monitoredState map[string]KubernetesResource, groups map[string]transit.ResourceGroup) {
nodes, _ := connector.kapi.Nodes().List(connector.ctx, metav1.ListOptions{}) // TODO: ListOptions can filter by label
func (connector *KubernetesConnector) collectNodeInventory(
monitoredState map[string]KubernetesResource,
groups map[string]transit.ResourceGroup) error {
// TODO: ListOptions can filter by label
nodes, err := connector.kapi.Nodes().List(connector.ctx, metav1.ListOptions{})
if err != nil {
log.Err(err).Msg("could not collect nodes inventory")
return err
}

for _, node := range nodes.Items {
resourceName := node.Name
labels := GetLabels(node)
Expand Down Expand Up @@ -391,19 +414,23 @@ func (connector *KubernetesConnector) collectNodeInventory(monitoredState map[st
}
monitoredState[fmt.Sprintf("node:%v:%v", namespace, node.Name)] = resource
}
return nil
}

// Pod Inventory also retrieves status
// inventory also contains status, pod counts, capacity and allocation metrics
func (connector *KubernetesConnector) collectPodInventory(monitoredState map[string]KubernetesResource, groups map[string]transit.ResourceGroup, metricsPerContainer *bool) {
func (connector *KubernetesConnector) collectPodInventory(
monitoredState map[string]KubernetesResource,
groups map[string]transit.ResourceGroup,
metricsPerContainer *bool) error {
// TODO: filter pods by namespace(s)
groupsMap := make(map[string]bool)
pods, err := connector.kapi.Pods("").List(connector.ctx, metav1.ListOptions{})
if err != nil {
log.Err(err).Msg("could not collect pod inventory")
return
log.Err(err).Msg("could not collect pods inventory")
return err
}

groupsMap := make(map[string]bool)
addResource := func(
stateKey string,
resourceName string,
Expand Down Expand Up @@ -505,13 +532,15 @@ func (connector *KubernetesConnector) collectPodInventory(monitoredState map[str
groupName)
}
}
return nil
}

func (connector *KubernetesConnector) collectNodeMetrics(monitoredState map[string]KubernetesResource) {
nodes, err := connector.mapi.NodeMetricses().List(connector.ctx, metav1.ListOptions{}) // TODO: filter by namespace
func (connector *KubernetesConnector) collectNodeMetrics(monitoredState map[string]KubernetesResource) error {
// TODO: filter by namespace
nodes, err := connector.mapi.NodeMetricses().List(connector.ctx, metav1.ListOptions{})
if err != nil {
log.Err(err).Msg("could not collect node metrics")
return
log.Err(err).Msg("could not collect nodes metrics")
return err
}

for _, node := range nodes.Items {
Expand Down Expand Up @@ -554,14 +583,17 @@ func (connector *KubernetesConnector) collectNodeMetrics(monitoredState map[stri
log.Warn().Msgf("node not found in monitored state: %v", stateKey)
}
}
return nil
}

func (connector *KubernetesConnector) collectPodMetricsPerReplica(monitoredState map[string]KubernetesResource) {
pods, err := connector.mapi.PodMetricses("").List(connector.ctx, metav1.ListOptions{}) // TODO: filter by namespace
func (connector *KubernetesConnector) collectPodMetricsPerReplica(monitoredState map[string]KubernetesResource) error {
// TODO: filter by namespace
pods, err := connector.mapi.PodMetricses("").List(connector.ctx, metav1.ListOptions{})
if err != nil {
log.Err(err).Msg("could not collect pod metrics")
return
log.Err(err).Msg("could not collect pods metrics")
return err
}

for _, pod := range pods.Items {
stateKey := fmt.Sprintf("pod:%v:%v", GetLabels(pod)["namespace"], pod.Name)
if resource, ok := monitoredState[stateKey]; ok {
Expand Down Expand Up @@ -616,14 +648,16 @@ func (connector *KubernetesConnector) collectPodMetricsPerReplica(monitoredState
log.Warn().Msgf("pod not found in monitored state: %v", stateKey)
}
}
return nil
}

// treat each container uniquely -- store multi-metrics per pod replica for each node
func (connector *KubernetesConnector) collectPodMetricsPerContainer(monitoredState map[string]KubernetesResource) {
pods, err := connector.mapi.PodMetricses("").List(connector.ctx, metav1.ListOptions{}) // TODO: filter by namespace
func (connector *KubernetesConnector) collectPodMetricsPerContainer(monitoredState map[string]KubernetesResource) error {
// TODO: filter by namespace
pods, err := connector.mapi.PodMetricses("").List(connector.ctx, metav1.ListOptions{})
if err != nil {
log.Err(err).Msg("could not collect pod metrics")
return
return err
}
debugDetails := false
builderMap := make(map[string][]connectors.MetricBuilder)
Expand Down Expand Up @@ -704,6 +738,7 @@ func (connector *KubernetesConnector) collectPodMetricsPerContainer(monitoredSta
if debugDetails {
log.Debug().Interface("monitoredState", monitoredState).Send()
}
return nil
}

// Calculate Node Status based on Conditions, PID Pressure, memory Pressure, Disk Pressure all treated as default
Expand Down
7 changes: 5 additions & 2 deletions connectors/kubernetes-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,11 @@ func periodicHandler() {
}
}

inventory, monitored, groups := connector.Collect()
log.Debug().Msgf("Collected %d:%d:%d", len(inventory), len(monitored), len(groups))
inventory, monitored, groups, err := connector.Collect()
if err != nil {
log.Err(err).Msg("Collecting data")
return
}

if chk, err := connectors.Hashsum(inventory, groups); err != nil || !bytes.Equal(connector.iChksum, chk) {
if err == nil {
Expand Down

0 comments on commit 7dfacf8

Please sign in to comment.