Skip to content

Commit

Permalink
feat(k8s): implement wildcard All Namespaces discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewazores committed Nov 28, 2024
1 parent a141072 commit 2e67e53
Showing 1 changed file with 43 additions and 9 deletions.
52 changes: 43 additions & 9 deletions src/main/java/io/cryostat/discovery/KubeApiDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.management.remote.JMXServiceURL;

Expand Down Expand Up @@ -70,6 +71,7 @@
@ApplicationScoped
public class KubeApiDiscovery implements ResourceEventHandler<Endpoints> {

private static final String ALL_NAMESPACES = "*";
private static final String NAMESPACE_QUERY_ADDR = "NS_QUERY";
private static final String ENDPOINTS_DISCOVERY_ADDR = "ENDPOINTS_DISC";

Expand Down Expand Up @@ -116,13 +118,25 @@ protected HashMap<String, SharedIndexInformer<Endpoints>> initialize()
.getWatchNamespaces()
.forEach(
ns -> {
result.put(
ns,
client.endpoints()
.inNamespace(ns)
.inform(
KubeApiDiscovery.this,
informerResyncPeriod.toMillis()));
SharedIndexInformer<Endpoints> informer;
if (ALL_NAMESPACES.equals(ns)) {
informer =
client.endpoints()
.inAnyNamespace()
.inform(
KubeApiDiscovery.this,
informerResyncPeriod
.toMillis());
} else {
informer =
client.endpoints()
.inNamespace(ns)
.inform(
KubeApiDiscovery.this,
informerResyncPeriod
.toMillis());
}
result.put(ns, informer);
logger.debugv(
"Started Endpoints SharedInformer for namespace"
+ " \"{0}\" with resync period {1}",
Expand All @@ -148,7 +162,11 @@ void onStart(@Observes StartupEvent evt) {
() -> {
try {
logger.debug("Resyncing");
notify(NamespaceQueryEvent.from(kubeConfig.getWatchNamespaces()));
notify(
NamespaceQueryEvent.from(
kubeConfig.getWatchNamespaces().stream()
.filter(ns -> !ALL_NAMESPACES.equals(ns))
.toList()));
} catch (Exception e) {
logger.warn(e);
}
Expand Down Expand Up @@ -226,6 +244,15 @@ List<TargetTuple> tuplesFromEndpoints(Endpoints endpoints) {
for (EndpointPort port : subset.getPorts()) {
for (EndpointAddress addr : subset.getAddresses()) {
var ref = addr.getTargetRef();
if (ref == null) {
logger.debugv(
"Endpoints object {0} in {1} with address {2} had a null"
+ " targetRef",
endpoints.getMetadata().getName(),
endpoints.getMetadata().getNamespace(),
addr.getIp());
continue;
}
tts.add(
new TargetTuple(
ref,
Expand Down Expand Up @@ -295,8 +322,15 @@ public void handleQueryEvent(NamespaceQueryEvent evt) {
persistedTargets.add(node.target);
}

Stream<Endpoints> endpoints;
if (safeGetInformers().containsKey(namespace)) {
endpoints = safeGetInformers().get(namespace).getStore().list().stream();
} else {
endpoints =
client.endpoints().inNamespace(namespace).list().getItems().stream();
}
Set<Target> observedTargets =
safeGetInformers().get(namespace).getStore().list().stream()
endpoints
.map((endpoint) -> getTargetTuplesFrom(endpoint))
.flatMap(List::stream)
.filter((tuple) -> Objects.nonNull(tuple.objRef))
Expand Down

0 comments on commit 2e67e53

Please sign in to comment.