From 2e67e53345ec0c1607db9441be47c359be8b9a8b Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Tue, 26 Nov 2024 14:36:32 -0500 Subject: [PATCH] feat(k8s): implement wildcard All Namespaces discovery --- .../cryostat/discovery/KubeApiDiscovery.java | 52 +++++++++++++++---- 1 file changed, 43 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 685dbd92c..10134cf91 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.management.remote.JMXServiceURL; @@ -70,6 +71,7 @@ @ApplicationScoped public class KubeApiDiscovery implements ResourceEventHandler { + private static final String ALL_NAMESPACES = "*"; private static final String NAMESPACE_QUERY_ADDR = "NS_QUERY"; private static final String ENDPOINTS_DISCOVERY_ADDR = "ENDPOINTS_DISC"; @@ -116,13 +118,25 @@ protected HashMap> initialize() .getWatchNamespaces() .forEach( ns -> { - result.put( - ns, - client.endpoints() - .inNamespace(ns) - .inform( - KubeApiDiscovery.this, - informerResyncPeriod.toMillis())); + SharedIndexInformer informer; + if (ALL_NAMESPACES.equals(ns)) { + informer = + client.endpoints() + .inAnyNamespace() + .inform( + KubeApiDiscovery.this, + informerResyncPeriod + .toMillis()); + } else { + informer = + client.endpoints() + .inNamespace(ns) + .inform( + KubeApiDiscovery.this, + informerResyncPeriod + .toMillis()); + } + result.put(ns, informer); logger.debugv( "Started Endpoints SharedInformer for namespace" + " \"{0}\" with resync period {1}", @@ -148,7 +162,11 @@ void onStart(@Observes StartupEvent evt) { () -> { try { logger.debug("Resyncing"); - notify(NamespaceQueryEvent.from(kubeConfig.getWatchNamespaces())); + notify( + NamespaceQueryEvent.from( + kubeConfig.getWatchNamespaces().stream() + .filter(ns -> !ALL_NAMESPACES.equals(ns)) + .toList())); } catch (Exception e) { logger.warn(e); } @@ -226,6 +244,15 @@ List tuplesFromEndpoints(Endpoints endpoints) { for (EndpointPort port : subset.getPorts()) { for (EndpointAddress addr : subset.getAddresses()) { var ref = addr.getTargetRef(); + if (ref == null) { + logger.debugv( + "Endpoints object {0} in {1} with address {2} had a null" + + " targetRef", + endpoints.getMetadata().getName(), + endpoints.getMetadata().getNamespace(), + addr.getIp()); + continue; + } tts.add( new TargetTuple( ref, @@ -295,8 +322,15 @@ public void handleQueryEvent(NamespaceQueryEvent evt) { persistedTargets.add(node.target); } + Stream endpoints; + if (safeGetInformers().containsKey(namespace)) { + endpoints = safeGetInformers().get(namespace).getStore().list().stream(); + } else { + endpoints = + client.endpoints().inNamespace(namespace).list().getItems().stream(); + } Set observedTargets = - safeGetInformers().get(namespace).getStore().list().stream() + endpoints .map((endpoint) -> getTargetTuplesFrom(endpoint)) .flatMap(List::stream) .filter((tuple) -> Objects.nonNull(tuple.objRef))