diff --git a/connectors/events-connector/helpers/builder.go b/connectors/events-connector/helpers/builder.go index e12e12be..d03b6259 100644 --- a/connectors/events-connector/helpers/builder.go +++ b/connectors/events-connector/helpers/builder.go @@ -37,19 +37,19 @@ func ParsePrometheusData(data template.Data, cfg *ExtConfig) ([]ParseResult, err tags[k] = v } - hostGroupName, err := cfg.HostGroupMappings.Apply(tags) + hostGroupName, err := cfg.MapHostgroup.ApplyOR(tags) if err != nil { - log.Debug().Err(err).Interface("tags", tags).Send() + log.Debug().Err(err).Interface("tags", tags).Interface("mappings", cfg.MapHostgroup).Send() continue } - hostName, err := cfg.HostMappings.Apply(tags) + hostName, err := cfg.MapHostname.ApplyOR(tags) if err != nil || hostName == "" { - log.Debug().Err(err).Interface("tags", tags).Send() + log.Debug().Err(err).Interface("tags", tags).Interface("mappings", cfg.MapHostname).Send() continue } - serviceName, err := cfg.ServiceMappings.Apply(tags) + serviceName, err := cfg.MapService.ApplyOR(tags) if err != nil || serviceName == "" { - log.Debug().Err(err).Interface("tags", tags).Send() + log.Debug().Err(err).Interface("tags", tags).Interface("mappings", cfg.MapService).Send() continue } diff --git a/connectors/events-connector/helpers/config.go b/connectors/events-connector/helpers/config.go index d3cb09fc..a0905c98 100644 --- a/connectors/events-connector/helpers/config.go +++ b/connectors/events-connector/helpers/config.go @@ -21,9 +21,9 @@ var ( ) type ExtConfig struct { - HostMappings mapping.Mappings `json:"hostMappings"` - HostGroupMappings mapping.Mappings `json:"hostGroupMappings"` - ServiceMappings mapping.Mappings `json:"serviceMappings"` + MapHostgroup mapping.Mappings `json:"mapHostgroup"` + MapHostname mapping.Mappings `json:"mapHostname"` + MapService mapping.Mappings `json:"mapService"` } func GetExtConfig() *ExtConfig { @@ -54,15 +54,15 @@ func ConfigHandler(data []byte) { } /* Update config with received values */ extConfig, metricsProfile, monitorConnection = tExt, tMetProf, tMonConn - if err := extConfig.HostMappings.Compile(); err != nil { - log.Err(err).Msg("failed to compile host mappings") + if err := extConfig.MapHostgroup.Compile(); err != nil { + log.Err(err).Msg("failed to compile host group mappings") return } - if err := extConfig.HostGroupMappings.Compile(); err != nil { - log.Err(err).Msg("failed to compile host group mappings") + if err := extConfig.MapHostname.Compile(); err != nil { + log.Err(err).Msg("failed to compile host mappings") return } - if err := extConfig.ServiceMappings.Compile(); err != nil { + if err := extConfig.MapService.Compile(); err != nil { log.Err(err).Msg("failed to compile service mappings") return } diff --git a/connectors/kubernetes-connector/kubernetesConnector.go b/connectors/kubernetes-connector/kubernetesConnector.go index 45cbed03..e1d61226 100644 --- a/connectors/kubernetes-connector/kubernetesConnector.go +++ b/connectors/kubernetes-connector/kubernetesConnector.go @@ -1,12 +1,16 @@ package main import ( + "cmp" "context" "errors" + "fmt" + "slices" "strings" "time" "github.com/gwos/tcg/connectors" + "github.com/gwos/tcg/sdk/mapping" "github.com/gwos/tcg/sdk/transit" "github.com/rs/zerolog/log" "gopkg.in/yaml.v3" @@ -15,6 +19,7 @@ import ( "k8s.io/client-go/kubernetes" kv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" metricsApi "k8s.io/metrics/pkg/client/clientset/versioned" mv1 "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1" ) @@ -34,6 +39,33 @@ type ExtConfig struct { KubernetesUserPassword string `json:"kubernetesUserPassword,omitempty"` KubernetesBearerToken string `json:"kubernetesBearerToken,omitempty"` KubernetesConfigFile string `json:"kubernetesConfigFile,omitempty"` + + GWMapping +} + +type GWMapping struct { + HostGroup mapping.Mappings `json:"mapHostgroup"` + HostName mapping.Mappings `json:"mapHostname"` +} + +// Prepare compiles mappings +func (m *GWMapping) Prepare() { + var hg, hn mapping.Mappings + for i := range m.HostGroup { + if err := m.HostGroup[i].Compile(); err != nil { + log.Warn().Err(err).Interface("mapping", m.HostGroup[i]).Msg("could not prepare mapping") + continue + } + hg = append(hg, m.HostGroup[i]) + } + for i := range m.HostName { + if err := m.HostName[i].Compile(); err != nil { + log.Warn().Err(err).Interface("mapping", m.HostName[i]).Msg("could not prepare mapping") + continue + } + hn = append(hn, m.HostName[i]) + } + m.HostGroup, m.HostName = hg, hn } type KubernetesView string @@ -53,10 +85,10 @@ const ( ) const ( - ClusterHostGroup = "cluster-" - ClusterNameLabel = "alpha.eksctl.io/cluster-name" ContainerPOD = "POD" - PodsHostGroup = "pods-" + ClusterNameLabel = "alpha.eksctl.io/cluster-name" + // ClusterHostGroup = "cluster-" + // PodsHostGroup = "pods-" defaultKubernetesClusterEndpoint = "" ) @@ -224,6 +256,7 @@ func (connector *KubernetesConnector) Collect() ([]transit.InventoryResource, [] for _, service := range resource.Services { services = append(services, connectors.CreateInventoryService(service.Name, service.Owner)) } + slices.SortFunc(services, func(a, b transit.InventoryService) int { return cmp.Compare(a.Name, b.Name) }) inventory = append(inventory, connectors.CreateInventoryResource(resource.Name, services)) // convert monitored state mServices := make([]transit.MonitoredService, 0, len(resource.Services)) @@ -247,8 +280,13 @@ func (connector *KubernetesConnector) Collect() ([]transit.InventoryResource, [] }) } for _, group := range groups { + slices.SortFunc(group.Resources, func(a, b transit.ResourceRef) int { return cmp.Compare(a.Name, b.Name) }) hostGroups = append(hostGroups, group) } + /* sort inventory data structures to allow checksums */ + slices.SortFunc(hostGroups, func(a, b transit.ResourceGroup) int { return cmp.Compare(a.GroupName, b.GroupName) }) + slices.SortFunc(inventory, func(a, b transit.InventoryResource) int { return cmp.Compare(a.Name, b.Name) }) + return inventory, monitored, hostGroups } @@ -267,28 +305,50 @@ func (connector *KubernetesConnector) Collect() ([]transit.InventoryResource, [] // (v1.ResourceName) (len=17) ephemeral-storage: (resource.Quantity) 18242267924, func (connector *KubernetesConnector) collectNodeInventory(monitoredState map[string]KubernetesResource, groups map[string]transit.ResourceGroup) { nodes, _ := connector.kapi.Nodes().List(connector.ctx, metav1.ListOptions{}) // TODO: ListOptions can filter by label - clusterHostGroupName := connector.makeClusterName(nodes) - groups[clusterHostGroupName] = transit.ResourceGroup{ - GroupName: clusterHostGroupName, - Type: transit.HostGroup, - Resources: make([]transit.ResourceRef, len(nodes.Items)), - } + for _, node := range nodes.Items { + resourceName := node.Name + labels := GetLabels(node) + namespace := labels["namespace"] + // apply mapping and skip unmatched entries + if len(connector.ExtConfig.GWMapping.HostName) > 0 { + var err error + resourceName, err = connector.ExtConfig.GWMapping.HostName.ApplyOR(labels) + if err != nil || resourceName == "" { + log.Debug().Err(err). + Interface("labels", labels).Interface("mappings", connector.ExtConfig.GWMapping.HostName). + Msg("could not map hostname on node") + continue + } + } + groupName, err := connector.ExtConfig.GWMapping.HostGroup.ApplyOR(labels) + if err != nil || groupName == "" { + groupName = "nodes-" + namespace + log.Debug().Err(err). + Interface("labels", labels).Interface("mappings", connector.ExtConfig.GWMapping.HostGroup). + Msg("could not map hostgroup on node, adding to nodes-namespace group") + } - for index, node := range nodes.Items { - labels := make(map[string]string) - for key, element := range node.Labels { - labels[key] = element + rf := transit.ResourceRef{Name: resourceName, Owner: groupName, Type: transit.ResourceTypeHost} + if group, ok := groups[groupName]; ok { + group.Resources = append(group.Resources, rf) + groups[groupName] = group + } else { + groups[groupName] = transit.ResourceGroup{ + GroupName: groupName, + Resources: []transit.ResourceRef{rf}, + Type: transit.HostGroup, + } } + monitorStatus, message := connector.calculateNodeStatus(&node) resource := KubernetesResource{ - Name: node.Name, + Name: resourceName, Type: transit.ResourceTypeHost, Status: monitorStatus, Message: message, Labels: labels, Services: make(map[string]transit.MonitoredService), } - monitoredState[resource.Name] = resource // process services for key, metricDefinition := range connector.ExtConfig.Views[ViewNodes] { var value interface{} @@ -329,12 +389,7 @@ func (connector *KubernetesConnector) collectNodeInventory(monitoredState map[st resource.Services[metricBuilder.Name] = *monitoredService } } - // add to default Cluster group - groups[clusterHostGroupName].Resources[index] = transit.ResourceRef{ - Name: resource.Name, - Owner: clusterHostGroupName, - Type: transit.ResourceTypeHost, - } + monitoredState[fmt.Sprintf("node:%v:%v", namespace, node.Name)] = resource } } @@ -355,7 +410,7 @@ func (connector *KubernetesConnector) collectPodInventory(monitoredState map[str monitorStatus transit.MonitorStatus, message string, labels map[string]string, - podHostGroup string, + groupName string, ) { resource := KubernetesResource{ Name: resourceName, @@ -372,51 +427,82 @@ func (connector *KubernetesConnector) collectPodInventory(monitoredState map[str return } groupsMap[resource.Name] = true - // add to namespace group for starters, need to consider namespace filtering - if group, ok := groups[podHostGroup]; ok { - group.Resources = append(group.Resources, transit.ResourceRef{ - Name: resource.Name, - Owner: group.GroupName, - Type: transit.ResourceTypeHost, - }) - groups[podHostGroup] = group + rf := transit.ResourceRef{Name: resource.Name, Owner: groupName, Type: transit.ResourceTypeHost} + if group, ok := groups[groupName]; ok { + group.Resources = append(group.Resources, rf) + groups[groupName] = group } else { - group = transit.ResourceGroup{ - GroupName: podHostGroup, + groups[groupName] = transit.ResourceGroup{ + GroupName: groupName, + Resources: []transit.ResourceRef{rf}, Type: transit.HostGroup, - Resources: make([]transit.ResourceRef, 0), } - group.Resources = append(group.Resources, transit.ResourceRef{ - Name: resource.Name, - Owner: group.GroupName, - Type: transit.ResourceTypeHost, - }) - groups[podHostGroup] = group } } for _, pod := range pods.Items { - labels := make(map[string]string) - for key, element := range pod.Labels { - labels[key] = element - } - monitorStatus, message := connector.calculatePodStatus(&pod) + resourceName := pod.Name if *metricsPerContainer { for _, container := range pod.Spec.Containers { - resourceName := strings.TrimSuffix(container.Name, "-") - if resourceName == ContainerPOD { + if ContainerPOD == strings.TrimSuffix(container.Name, "-") { continue } - addResource(pod.Name+"/"+resourceName, + + resourceName := container.Name + labels := GetLabels(pod, container) + namespace := labels["namespace"] + // apply mapping and skip unmatched entries + if len(connector.ExtConfig.GWMapping.HostName) > 0 { + var err error + resourceName, err = connector.ExtConfig.GWMapping.HostName.ApplyOR(labels) + if err != nil || resourceName == "" { + log.Debug().Err(err). + Interface("labels", labels).Interface("mappings", connector.ExtConfig.GWMapping.HostName). + Msg("could not map hostname on pod container") + continue + } + } + groupName, err := connector.ExtConfig.GWMapping.HostGroup.ApplyOR(labels) + if err != nil || groupName == "" { + groupName = "pods-" + namespace + log.Debug().Err(err). + Interface("labels", labels).Interface("mappings", connector.ExtConfig.GWMapping.HostGroup). + Msg("could not map hostgroup on pod container, adding to pods-namespace group") + } + + stateKey := fmt.Sprintf("pod:%v:%v:%v", namespace, pod.Name, container.Name) + addResource(stateKey, resourceName, monitorStatus, message, labels, - PodsHostGroup+pod.Namespace) + groupName) } } else { - addResource(pod.Name, - pod.Name, monitorStatus, message, labels, - PodsHostGroup+pod.Namespace) + labels := GetLabels(pod) + namespace := labels["namespace"] + // apply mapping and skip unmatched entries + if len(connector.ExtConfig.GWMapping.HostName) > 0 { + var err error + resourceName, err = connector.ExtConfig.GWMapping.HostName.ApplyOR(labels) + if err != nil || resourceName == "" { + log.Debug().Err(err). + Interface("labels", labels).Interface("mappings", connector.ExtConfig.GWMapping.HostName). + Msg("could not map hostname on pod") + continue + } + } + groupName, err := connector.ExtConfig.GWMapping.HostGroup.ApplyOR(labels) + if err != nil || groupName == "" { + groupName = "pods-" + namespace + log.Debug().Err(err). + Interface("labels", labels).Interface("mappings", connector.ExtConfig.GWMapping.HostGroup). + Msg("could not map hostgroup on pod, adding to pods-namespace group") + } + + stateKey := fmt.Sprintf("pod:%v:%v", namespace, pod.Name) + addResource(stateKey, + resourceName, monitorStatus, message, labels, + groupName) } } } @@ -429,7 +515,8 @@ func (connector *KubernetesConnector) collectNodeMetrics(monitoredState map[stri } for _, node := range nodes.Items { - if resource, ok := monitoredState[node.Name]; ok { + stateKey := fmt.Sprintf("node:%v:%v", GetLabels(node)["namespace"], node.Name) + if resource, ok := monitoredState[stateKey]; ok { for key, metricDefinition := range connector.ExtConfig.Views[ViewNodes] { var value interface{} switch key { @@ -454,16 +541,17 @@ func (connector *KubernetesConnector) collectNodeMetrics(monitoredState map[stri metricBuilder.StartTimestamp = &transit.Timestamp{Time: node.Timestamp.Time.UTC()} metricBuilder.EndTimestamp = &transit.Timestamp{Time: node.Timestamp.Time.UTC()} customServiceName := connectors.Name(metricBuilder.Name, metricDefinition.CustomName) - monitoredService, err := connectors.BuildServiceForMetric(node.Name, metricBuilder) + monitoredService, err := connectors.BuildServiceForMetric(resource.Name, metricBuilder) if err != nil { - log.Err(err).Msgf("could not create service %s:%s", node.Name, customServiceName) + log.Err(err).Msgf("could not create service %v:%v", resource.Name, customServiceName) } if monitoredService != nil { resource.Services[metricBuilder.Name] = *monitoredService + monitoredState[stateKey] = resource } } } else { - log.Error().Msgf("node not found in monitored state: %s", node.Name) + log.Warn().Msgf("node not found in monitored state: %v", stateKey) } } } @@ -475,7 +563,8 @@ func (connector *KubernetesConnector) collectPodMetricsPerReplica(monitoredState return } for _, pod := range pods.Items { - if resource, ok := monitoredState[pod.Name]; ok { + stateKey := fmt.Sprintf("pod:%v:%v", GetLabels(pod)["namespace"], pod.Name) + if resource, ok := monitoredState[stateKey]; ok { for index, container := range pod.Containers { if container.Name == ContainerPOD { continue @@ -515,15 +604,16 @@ func (connector *KubernetesConnector) collectPodMetricsPerReplica(monitoredState metricBuilders = append(metricBuilders, metricBuilder) monitoredService, err := connectors.BuildServiceForMultiMetric(container.Name, metricDefinition.Name, metricDefinition.CustomName, metricBuilders) if err != nil { - log.Err(err).Msgf("could not create service %s:%s", pod.Name, metricDefinition.Name) + log.Err(err).Msgf("could not create service %v:%v", stateKey, metricDefinition.Name) } if monitoredService != nil { resource.Services[metricBuilder.Name] = *monitoredService + monitoredState[stateKey] = resource } } } } else { - log.Error().Msgf("pod not found in monitored state: %s", pod.Name) + log.Warn().Msgf("pod not found in monitored state: %v", stateKey) } } } @@ -544,7 +634,9 @@ func (connector *KubernetesConnector) collectPodMetricsPerContainer(monitoredSta if container.Name == ContainerPOD { continue } - if resource, ok := monitoredState[pod.Name+"/"+container.Name]; ok { + + stateKey := fmt.Sprintf("pod:%v:%v:%v", GetLabels(pod, container)["namespace"], pod.Name, container.Name) + if resource, ok := monitoredState[stateKey]; ok { var value interface{} switch key { case cpuCores: @@ -600,9 +692,10 @@ func (connector *KubernetesConnector) collectPodMetricsPerContainer(monitoredSta } if monitoredService != nil { resource.Services[metricDefinition.Name] = *monitoredService + monitoredState[stateKey] = resource } } else { - log.Error().Msgf("pod container not found in monitored state: %s/%s", pod.Name, container.Name) + log.Warn().Msgf("pod container not found in monitored state: %v", stateKey) debugDetails = true } } @@ -668,15 +761,6 @@ func (connector *KubernetesConnector) calculatePodStatus(pod *v1.Pod) (transit.M return status, message.String() } -func (connector *KubernetesConnector) makeClusterName(nodes *v1.NodeList) string { - if len(nodes.Items) > 0 { - if value, ok := nodes.Items[0].Labels[ClusterNameLabel]; ok { - return ClusterHostGroup + value - } - } - return ClusterHostGroup + "1" -} - // toPercentage - converts CPU from cores to percentage // // 1 core = 1000 Millicores = 100% @@ -692,3 +776,42 @@ func (connector *KubernetesConnector) makeClusterName(nodes *v1.NodeList) string func toPercentage(capacityMilliValue, allocatableMilliValue int64) float64 { return float64(allocatableMilliValue) / float64(capacityMilliValue) * 100 } + +func GetLabels(a ...interface{}) map[string]string { + labels := map[string]string{ + "cluster": "default", + "namespace": "default", + } + for _, v := range a { + switch v := v.(type) { + case v1.Container: + labels["container_name"] = v.Name + case v1beta1.ContainerMetrics: + labels["container_name"] = v.Name + case v1.Node: + labels["node_name"] = v.Name + case v1beta1.NodeMetrics: + labels["node_name"] = v.Name + case v1.Pod: + labels["pod_name"] = v.Name + case v1beta1.PodMetrics: + labels["pod_name"] = v.Name + } + if v, ok := v.(interface{ GetLabels() map[string]string }); ok { + for key, element := range v.GetLabels() { + labels[key] = element + } + if value, ok := labels[ClusterNameLabel]; ok { + labels["cluster"] = value + } + } + if v, ok := v.(interface{ GetNamespace() string }); ok { + ns := v.GetNamespace() + if ns != "" { + labels["namespace"] = ns + } + } + } + + return labels +} diff --git a/connectors/kubernetes-connector/main.go b/connectors/kubernetes-connector/main.go index e3e9b1b6..8206a6e9 100644 --- a/connectors/kubernetes-connector/main.go +++ b/connectors/kubernetes-connector/main.go @@ -94,6 +94,7 @@ func configHandler(data []byte) { } /* Update config with received values */ + tExt.GWMapping.Prepare() tExt.Views[ViewNodes] = buildNodeMetricsMap(tMetProf.Metrics) tExt.Views[ViewPods] = buildPodMetricsMap(tMetProf.Metrics) diff --git a/sdk/mapping/mapping.go b/sdk/mapping/mapping.go index 426092ff..dde37183 100644 --- a/sdk/mapping/mapping.go +++ b/sdk/mapping/mapping.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "regexp" + "strings" ) var ( @@ -65,6 +66,44 @@ func (p Mappings) Apply(tags map[string]string) (string, error) { return string(result), nil } +func (p Mappings) ApplyOR(tags map[string]string) (string, error) { + mismatched := false +LOOP_OR: + for _, mapping := range p { + if mapping.Tag == "" { + return mapping.Template, nil + } + + var vals []string + for _, key := range strings.Split(mapping.Tag, ",") { + if val, ok := tags[strings.TrimSpace(key)]; ok { + vals = append(vals, val) + } else { + continue LOOP_OR + } + } + content := strings.Join(vals, ",") + matches := mapping.matcher.FindAllStringSubmatchIndex(content, -1) + if matches == nil { + mismatched = true + continue LOOP_OR + } + result := []byte{} + for _, submatches := range matches { + result = mapping.matcher.ExpandString(result, mapping.Template, content, submatches) + } + return string(result), nil + } + + if mismatched { + return "", ErrMappingMismatchedTag + } + if len(p) > 0 { + return "", ErrMappingMissedTag + } + return "", nil +} + // Compile compiles mappings matchers. func (p Mappings) Compile() error { for i := range p { diff --git a/sdk/mapping/mapping_test.go b/sdk/mapping/mapping_test.go index ffe83538..3d393d71 100644 --- a/sdk/mapping/mapping_test.go +++ b/sdk/mapping/mapping_test.go @@ -2,6 +2,7 @@ package mapping import ( "encoding/json" + "strings" "testing" ) @@ -863,6 +864,198 @@ func TestMappings(t *testing.T) { } } +func TestMappingsApplyOR(t *testing.T) { + tagMaps := testData() + + error2string := func(err error) string { + if err != nil { + return err.Error() + } + return "" + } + + type FnRes struct{ str, err string } + type TestCase struct { + Name string + Mappings string + Results []FnRes + } + + cases := []TestCase{ + {"pod_or_node", + `{"mappings":[{"tag":"pod_name","matcher":"(.*)","template":"pod-$1"},{"tag":"node_name","matcher":"(.*)","template":"node-$1"}]}`, + []FnRes{ + {"node-minikube", ""}, + {"node-minikube", ""}, + {"node-minikube", ""}, + {"node-minikube", ""}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"pod-metrics-server-6796b9d575-b26t8", ""}, + {"pod-kube-proxy-kvdm2", ""}, + {"pod-coredns-bd6b6df9f-5pjvl", ""}, + {"pod-storage-provisioner", ""}, + {"pod-kube-apiserver-minikube", ""}, + {"pod-kube-scheduler-minikube", ""}, + {"pod-etcd-minikube", ""}, + {"pod-picasa-connector-wp7hc", ""}, + {"pod-coredns-bd6b6df9f-hfc84", ""}, + {"pod-calico-node-xx6kj", ""}, + {"pod-kube-controller-manager-minikube", ""}, + {"pod-calico-kube-controllers-7b8458594b-chv99", ""}, + {"pod-registry-t86w2", ""}, + {"pod-registry-proxy-ks9xb", ""}, + {"pod-storage-provisioner", ""}, + {"pod-storage-provisioner", ""}, + {"pod-storage-provisioner", ""}, + {"pod-metrics-server-6796b9d575-b26t8", ""}, + {"pod-metrics-server-6796b9d575-b26t8", ""}, + {"pod-metrics-server-6796b9d575-b26t8", ""}, + {"pod-metrics-server-6796b9d575-b26t8", ""}, + {"pod-registry-proxy-ks9xb", ""}, + {"pod-registry-proxy-ks9xb", ""}, + {"pod-registry-proxy-ks9xb", ""}, + {"pod-kube-controller-manager-minikube", ""}, + {"pod-kube-controller-manager-minikube", ""}, + {"pod-kube-scheduler-minikube", ""}, + {"pod-kube-scheduler-minikube", ""}, + {"pod-kube-apiserver-minikube", ""}, + {"pod-kube-apiserver-minikube", ""}, + {"pod-coredns-bd6b6df9f-hfc84", ""}, + {"pod-coredns-bd6b6df9f-hfc84", ""}, + {"pod-coredns-bd6b6df9f-hfc84", ""}, + {"pod-coredns-bd6b6df9f-hfc84", ""}, + {"pod-calico-node-xx6kj", ""}, + {"pod-calico-node-xx6kj", ""}, + {"pod-calico-node-xx6kj", ""}, + {"pod-picasa-connector-wp7hc", ""}, + {"pod-picasa-connector-wp7hc", ""}, + {"pod-picasa-connector-wp7hc", ""}, + {"pod-picasa-connector-wp7hc", ""}, + {"pod-etcd-minikube", ""}, + {"pod-etcd-minikube", ""}, + {"pod-calico-kube-controllers-7b8458594b-chv99", ""}, + {"pod-registry-t86w2", ""}, + {"pod-calico-kube-controllers-7b8458594b-chv99", ""}, + {"pod-calico-kube-controllers-7b8458594b-chv99", ""}, + {"pod-registry-t86w2", ""}, + {"pod-registry-t86w2", ""}, + {"pod-coredns-bd6b6df9f-5pjvl", ""}, + {"pod-coredns-bd6b6df9f-5pjvl", ""}, + {"pod-coredns-bd6b6df9f-5pjvl", ""}, + {"pod-coredns-bd6b6df9f-5pjvl", ""}, + {"pod-kube-proxy-kvdm2", ""}, + {"pod-kube-proxy-kvdm2", ""}, + {"pod-kube-proxy-kvdm2", ""}, + {"pod-kube-proxy-kvdm2", ""}, + }, + }, + + {"node_pod_or_missing_label", + `{"mappings":[{"tag":"node_name,pod_name","matcher":"(.*),(.*)","template":"$1-$2"},{"tag":"missing_label","matcher":"(.*)","template":"source-$1"}]}`, + []FnRes{ + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"", "mapping error: missed tag"}, + {"minikube-metrics-server-6796b9d575-b26t8", ""}, + {"minikube-kube-proxy-kvdm2", ""}, + {"minikube-coredns-bd6b6df9f-5pjvl", ""}, + {"minikube-storage-provisioner", ""}, + {"minikube-kube-apiserver-minikube", ""}, + {"minikube-kube-scheduler-minikube", ""}, + {"minikube-etcd-minikube", ""}, + {"minikube-picasa-connector-wp7hc", ""}, + {"minikube-coredns-bd6b6df9f-hfc84", ""}, + {"minikube-calico-node-xx6kj", ""}, + {"minikube-kube-controller-manager-minikube", ""}, + {"minikube-calico-kube-controllers-7b8458594b-chv99", ""}, + {"minikube-registry-t86w2", ""}, + {"minikube-registry-proxy-ks9xb", ""}, + {"minikube-storage-provisioner", ""}, + {"minikube-storage-provisioner", ""}, + {"minikube-storage-provisioner", ""}, + {"minikube-metrics-server-6796b9d575-b26t8", ""}, + {"minikube-metrics-server-6796b9d575-b26t8", ""}, + {"minikube-metrics-server-6796b9d575-b26t8", ""}, + {"minikube-metrics-server-6796b9d575-b26t8", ""}, + {"minikube-registry-proxy-ks9xb", ""}, + {"minikube-registry-proxy-ks9xb", ""}, + {"minikube-registry-proxy-ks9xb", ""}, + {"minikube-kube-controller-manager-minikube", ""}, + {"minikube-kube-controller-manager-minikube", ""}, + {"minikube-kube-scheduler-minikube", ""}, + {"minikube-kube-scheduler-minikube", ""}, + {"minikube-kube-apiserver-minikube", ""}, + {"minikube-kube-apiserver-minikube", ""}, + {"minikube-coredns-bd6b6df9f-hfc84", ""}, + {"minikube-coredns-bd6b6df9f-hfc84", ""}, + {"minikube-coredns-bd6b6df9f-hfc84", ""}, + {"minikube-coredns-bd6b6df9f-hfc84", ""}, + {"minikube-calico-node-xx6kj", ""}, + {"minikube-calico-node-xx6kj", ""}, + {"minikube-calico-node-xx6kj", ""}, + {"minikube-picasa-connector-wp7hc", ""}, + {"minikube-picasa-connector-wp7hc", ""}, + {"minikube-picasa-connector-wp7hc", ""}, + {"minikube-picasa-connector-wp7hc", ""}, + {"minikube-etcd-minikube", ""}, + {"minikube-etcd-minikube", ""}, + {"minikube-calico-kube-controllers-7b8458594b-chv99", ""}, + {"minikube-registry-t86w2", ""}, + {"minikube-calico-kube-controllers-7b8458594b-chv99", ""}, + {"minikube-calico-kube-controllers-7b8458594b-chv99", ""}, + {"minikube-registry-t86w2", ""}, + {"minikube-registry-t86w2", ""}, + {"minikube-coredns-bd6b6df9f-5pjvl", ""}, + {"minikube-coredns-bd6b6df9f-5pjvl", ""}, + {"minikube-coredns-bd6b6df9f-5pjvl", ""}, + {"minikube-coredns-bd6b6df9f-5pjvl", ""}, + {"minikube-kube-proxy-kvdm2", ""}, + {"minikube-kube-proxy-kvdm2", ""}, + {"minikube-kube-proxy-kvdm2", ""}, + {"minikube-kube-proxy-kvdm2", ""}, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + p := new(struct { + Mappings `json:"mappings"` + }) + if err := json.Unmarshal([]byte(tc.Mappings), p); err != nil { + t.Errorf("error unmarshalling mappings: %v", err) + } + if err := p.Mappings.Compile(); err != nil { + t.Errorf("error compiling mappings: %v", err) + } + + for i, tags := range tagMaps { + str, err := p.Mappings.ApplyOR(tags) + if tc.Results[i].str != str || !strings.HasPrefix(error2string(err), tc.Results[i].err) { + t.Errorf("error:\tnot equal:\n\texpected: %+v\n\tactual : %+v", + tc.Results[i], FnRes{str, error2string(err)}) + } + + // t.Logf("%s %d\t %+v", t.Name(), i, FnRes{str, error2string(err)}) + } + }) + } +} + func testData() (tagMaps []map[string]string) { payloads := []string{ `{"context":{"appType":"KUBERNETES","agentId":"Picasa Kubernetes Agent","traceToken":"c80cc40b-3c34-0886-c7f0-773875a67ff0","timeStamp":"1682669743631","version":"1.0.0"},"metrics":[{"Name":"kubernetes_system_container","Tags":[{"Key":"cluster","Value":"domain"},{"Key":"container_name","Value":"kubelet"},{"Key":"node_name","Value":"minikube"},{"Key":"source","Value":"default|domain|minikube"},{"Key":"tenant","Value":"default"}],"Fields":[{"Key":"rootfs_available_bytes","Value":0},{"Key":"logsfs_available_bytes","Value":0},{"Key":"cpu_usage_nanocores","Value":79022110},{"Key":"cpu_usage_core_nanoseconds","Value":66139307000},{"Key":"memory_usage_bytes","Value":225746944},{"Key":"memory_rss_bytes","Value":45989888},{"Key":"logsfs_capacity_bytes","Value":0},{"Key":"memory_working_set_bytes","Value":91303936},{"Key":"memory_page_faults","Value":740115},{"Key":"memory_major_page_faults","Value":48},{"Key":"rootfs_capacity_bytes","Value":0}],"Tm":"2023-04-28T08:15:43.631Z","Tp":3}]}`,