Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewazores committed Nov 28, 2024
1 parent 2e67e53 commit 908deda
Showing 1 changed file with 55 additions and 46 deletions.
101 changes: 55 additions & 46 deletions src/main/java/io/cryostat/discovery/KubeApiDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,42 +110,42 @@ public class KubeApiDiscovery implements ResourceEventHandler<Endpoints> {
@Override
protected HashMap<String, SharedIndexInformer<Endpoints>> initialize()
throws ConcurrentException {
// TODO: add support for some wildcard indicating a single Informer for any
// namespace that Cryostat has permissions to. This will need some restructuring
// of how the namespaces within the discovery tree are mapped.
var result = new HashMap<String, SharedIndexInformer<Endpoints>>();
kubeConfig
.getWatchNamespaces()
.forEach(
ns -> {
SharedIndexInformer<Endpoints> informer;
if (ALL_NAMESPACES.equals(ns)) {
informer =
client.endpoints()
.inAnyNamespace()
.inform(
KubeApiDiscovery.this,
informerResyncPeriod
.toMillis());
} else {
informer =
if (watchAllNamespaces()) {
result.put(
ALL_NAMESPACES,
client.endpoints()
.inAnyNamespace()
.inform(
KubeApiDiscovery.this,
informerResyncPeriod.toMillis()));
} else {
kubeConfig
.getWatchNamespaces()
.forEach(
ns -> {
result.put(
ns,
client.endpoints()
.inNamespace(ns)
.inform(
KubeApiDiscovery.this,
informerResyncPeriod
.toMillis());
}
result.put(ns, informer);
logger.debugv(
"Started Endpoints SharedInformer for namespace"
+ " \"{0}\" with resync period {1}",
ns, informerResyncPeriod);
});
.toMillis()));
logger.debugv(
"Started Endpoints SharedInformer for namespace"
+ " \"{0}\" with resync period {1}",
ns, informerResyncPeriod);
});
}
return result;
}
};

private boolean watchAllNamespaces() {
return kubeConfig.getWatchNamespaces().stream().anyMatch(ns -> ALL_NAMESPACES.equals(ns));
}

void onStart(@Observes StartupEvent evt) {
if (!enabled()) {
return;
Expand All @@ -158,22 +158,26 @@ void onStart(@Observes StartupEvent evt) {

logger.debugv("Starting {0} client", REALM);
safeGetInformers();
resyncWorker.scheduleAtFixedRate(
() -> {
try {
logger.debug("Resyncing");
notify(
NamespaceQueryEvent.from(
kubeConfig.getWatchNamespaces().stream()
.filter(ns -> !ALL_NAMESPACES.equals(ns))
.toList()));
} catch (Exception e) {
logger.warn(e);
}
},
0,
informerResyncPeriod.toMillis(),
TimeUnit.MILLISECONDS);
// TODO we should not need to force manual re-syncs this way - the Informer is already
// supposed to resync itself.
if (!watchAllNamespaces()) {
resyncWorker.scheduleAtFixedRate(
() -> {
try {
logger.debug("Resyncing");
notify(
NamespaceQueryEvent.from(
kubeConfig.getWatchNamespaces().stream()
.filter(ns -> !ALL_NAMESPACES.equals(ns))
.toList()));
} catch (Exception e) {
logger.warn(e);
}
},
0,
informerResyncPeriod.toMillis(),
TimeUnit.MILLISECONDS);
}
}

void onStop(@Observes ShutdownEvent evt) {
Expand Down Expand Up @@ -323,11 +327,16 @@ public void handleQueryEvent(NamespaceQueryEvent evt) {
}

Stream<Endpoints> endpoints;
if (safeGetInformers().containsKey(namespace)) {
endpoints = safeGetInformers().get(namespace).getStore().list().stream();
} else {
if (watchAllNamespaces()) {
endpoints =
client.endpoints().inNamespace(namespace).list().getItems().stream();
safeGetInformers().get(ALL_NAMESPACES).getStore().list().stream()
.filter(
ep ->
Objects.equals(
ep.getMetadata().getNamespace(),
namespace));
} else {
endpoints = safeGetInformers().get(namespace).getStore().list().stream();
}
Set<Target> observedTargets =
endpoints
Expand Down

0 comments on commit 908deda

Please sign in to comment.