From a9275b3d59b73e6a3b383d215d9e8f903bbbf1f1 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Tue, 26 Nov 2024 14:51:37 -0500 Subject: [PATCH] refactor --- .../cryostat/discovery/KubeApiDiscovery.java | 101 ++++++++++-------- 1 file changed, 55 insertions(+), 46 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 10134cf91..d5cd02f2e 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -110,42 +110,42 @@ public class KubeApiDiscovery implements ResourceEventHandler { @Override protected HashMap> initialize() throws ConcurrentException { - // TODO: add support for some wildcard indicating a single Informer for any - // namespace that Cryostat has permissions to. This will need some restructuring - // of how the namespaces within the discovery tree are mapped. var result = new HashMap>(); - kubeConfig - .getWatchNamespaces() - .forEach( - ns -> { - SharedIndexInformer informer; - if (ALL_NAMESPACES.equals(ns)) { - informer = - client.endpoints() - .inAnyNamespace() - .inform( - KubeApiDiscovery.this, - informerResyncPeriod - .toMillis()); - } else { - informer = + if (watchAllNamespaces()) { + result.put( + ALL_NAMESPACES, + client.endpoints() + .inAnyNamespace() + .inform( + KubeApiDiscovery.this, + informerResyncPeriod.toMillis())); + } else { + kubeConfig + .getWatchNamespaces() + .forEach( + ns -> { + result.put( + ns, client.endpoints() .inNamespace(ns) .inform( KubeApiDiscovery.this, informerResyncPeriod - .toMillis()); - } - result.put(ns, informer); - logger.debugv( - "Started Endpoints SharedInformer for namespace" - + " \"{0}\" with resync period {1}", - ns, informerResyncPeriod); - }); + .toMillis())); + logger.debugv( + "Started Endpoints SharedInformer for namespace" + + " \"{0}\" with resync period {1}", + ns, informerResyncPeriod); + }); + } return result; } }; + private boolean watchAllNamespaces() { + return kubeConfig.getWatchNamespaces().stream().anyMatch(ns -> ALL_NAMESPACES.equals(ns)); + } + void onStart(@Observes StartupEvent evt) { if (!enabled()) { return; @@ -158,22 +158,26 @@ void onStart(@Observes StartupEvent evt) { logger.debugv("Starting {0} client", REALM); safeGetInformers(); - resyncWorker.scheduleAtFixedRate( - () -> { - try { - logger.debug("Resyncing"); - notify( - NamespaceQueryEvent.from( - kubeConfig.getWatchNamespaces().stream() - .filter(ns -> !ALL_NAMESPACES.equals(ns)) - .toList())); - } catch (Exception e) { - logger.warn(e); - } - }, - 0, - informerResyncPeriod.toMillis(), - TimeUnit.MILLISECONDS); + // TODO we should not need to force manual re-syncs this way - the Informer is already + // supposed to resync itself. + if (!watchAllNamespaces()) { + resyncWorker.scheduleAtFixedRate( + () -> { + try { + logger.debug("Resyncing"); + notify( + NamespaceQueryEvent.from( + kubeConfig.getWatchNamespaces().stream() + .filter(ns -> !ALL_NAMESPACES.equals(ns)) + .toList())); + } catch (Exception e) { + logger.warn(e); + } + }, + 0, + informerResyncPeriod.toMillis(), + TimeUnit.MILLISECONDS); + } } void onStop(@Observes ShutdownEvent evt) { @@ -323,11 +327,16 @@ public void handleQueryEvent(NamespaceQueryEvent evt) { } Stream endpoints; - if (safeGetInformers().containsKey(namespace)) { - endpoints = safeGetInformers().get(namespace).getStore().list().stream(); - } else { + if (watchAllNamespaces()) { endpoints = - client.endpoints().inNamespace(namespace).list().getItems().stream(); + safeGetInformers().get(ALL_NAMESPACES).getStore().list().stream() + .filter( + ep -> + Objects.equals( + ep.getMetadata().getNamespace(), + namespace)); + } else { + endpoints = safeGetInformers().get(namespace).getStore().list().stream(); } Set observedTargets = endpoints