Skip to content

Commit

Permalink
fix(startup): platform detection progresses with limited parallelism (#…
Browse files Browse the repository at this point in the history
…1658)

* fix(threads): use independent pools where sensible rather than ForkJoinPool.commonPool()

* fix(vertx): ensure minimum event loop pool size
  • Loading branch information
andrewazores authored Sep 7, 2023
1 parent 2f98fe0 commit 93f1089
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 47 deletions.
2 changes: 2 additions & 0 deletions src/main/java/io/cryostat/discovery/DiscoveryModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.time.Duration;
import java.util.Set;
import java.util.concurrent.Executors;

import javax.inject.Named;
import javax.inject.Singleton;
Expand Down Expand Up @@ -79,6 +80,7 @@ static DiscoveryStorage provideDiscoveryStorage(
Logger logger) {
return new DiscoveryStorage(
deployer,
Executors.newCachedThreadPool(),
pingPeriod,
builtin,
dao,
Expand Down
41 changes: 21 additions & 20 deletions src/main/java/io/cryostat/discovery/DiscoveryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;

import javax.script.ScriptException;
Expand Down Expand Up @@ -69,6 +69,7 @@ public class DiscoveryStorage extends AbstractPlatformClientVerticle {
public static final URI NO_CALLBACK = null;
private final Duration pingPeriod;
private final VerticleDeployer deployer;
private final ExecutorService executor;
private final Lazy<BuiltInDiscovery> builtin;
private final PluginInfoDao dao;
private final Lazy<JvmIdHelper> jvmIdHelper;
Expand All @@ -86,6 +87,7 @@ public class DiscoveryStorage extends AbstractPlatformClientVerticle {

DiscoveryStorage(
VerticleDeployer deployer,
ExecutorService executor,
Duration pingPeriod,
Lazy<BuiltInDiscovery> builtin,
PluginInfoDao dao,
Expand All @@ -96,6 +98,7 @@ public class DiscoveryStorage extends AbstractPlatformClientVerticle {
WebClient http,
Logger logger) {
this.deployer = deployer;
this.executor = executor;
this.pingPeriod = pingPeriod;
this.builtin = builtin;
this.dao = dao;
Expand Down Expand Up @@ -183,26 +186,24 @@ public void start(Promise<Void> future) throws Exception {
}

private void testNonConnectedTargets(Predicate<Entry<TargetNode, UUID>> predicate) {
ForkJoinPool.commonPool()
.execute(
() -> {
Map<TargetNode, UUID> copy = new HashMap<>(nonConnectableTargets);
for (var entry : copy.entrySet()) {
try {
if (predicate.test(entry)) {
nonConnectableTargets.remove(entry.getKey());
UUID id = entry.getValue();
PluginInfo plugin = getById(id).orElseThrow();
EnvironmentNode original =
gson.fromJson(
plugin.getSubtree(), EnvironmentNode.class);
update(id, original.getChildren());
}
} catch (JsonSyntaxException e) {
throw new RuntimeException(e);
}
executor.execute(
() -> {
Map<TargetNode, UUID> copy = new HashMap<>(nonConnectableTargets);
for (var entry : copy.entrySet()) {
try {
if (predicate.test(entry)) {
nonConnectableTargets.remove(entry.getKey());
UUID id = entry.getValue();
PluginInfo plugin = getById(id).orElseThrow();
EnvironmentNode original =
gson.fromJson(plugin.getSubtree(), EnvironmentNode.class);
update(id, original.getChildren());
}
});
} catch (JsonSyntaxException e) {
throw new RuntimeException(e);
}
}
});
}

@Override
Expand Down
19 changes: 9 additions & 10 deletions src/main/java/io/cryostat/net/AgentClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ExecutorService;

import javax.management.ObjectName;
import javax.script.ScriptException;
Expand All @@ -49,7 +49,6 @@

import com.google.gson.Gson;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
Expand All @@ -64,7 +63,7 @@
public class AgentClient {
public static final String NULL_CREDENTIALS = "No credentials found for agent";

private final Vertx vertx;
private final ExecutorService executor;
private final Gson gson;
private final long httpTimeout;
private final WebClient webClient;
Expand All @@ -73,14 +72,14 @@ public class AgentClient {
private final Logger logger;

AgentClient(
Vertx vertx,
ExecutorService executor,
Gson gson,
long httpTimeout,
WebClient webClient,
CredentialsManager credentialsManager,
URI agentUri,
Logger logger) {
this.vertx = vertx;
this.executor = executor;
this.gson = gson;
this.httpTimeout = httpTimeout;
this.webClient = webClient;
Expand Down Expand Up @@ -237,7 +236,7 @@ private <T> Future<HttpResponse<T>> invoke(HttpMethod mtd, String path, BodyCode
throw new RuntimeException(e);
}
},
ForkJoinPool.commonPool())
executor)
.exceptionally(
t -> {
throw new RuntimeException(t);
Expand All @@ -246,21 +245,21 @@ private <T> Future<HttpResponse<T>> invoke(HttpMethod mtd, String path, BodyCode

static class Factory {

private final Vertx vertx;
private final ExecutorService executor;
private final Gson gson;
private final long httpTimeout;
private final WebClient webClient;
private final CredentialsManager credentialsManager;
private final Logger logger;

Factory(
Vertx vertx,
ExecutorService executor,
Gson gson,
long httpTimeout,
WebClient webClient,
CredentialsManager credentialsManager,
Logger logger) {
this.vertx = vertx;
this.executor = executor;
this.gson = gson;
this.httpTimeout = httpTimeout;
this.webClient = webClient;
Expand All @@ -270,7 +269,7 @@ static class Factory {

AgentClient create(URI agentUri) {
return new AgentClient(
vertx, gson, httpTimeout, webClient, credentialsManager, agentUri, logger);
executor, gson, httpTimeout, webClient, credentialsManager, agentUri, logger);
}
}

Expand Down
18 changes: 13 additions & 5 deletions src/main/java/io/cryostat/net/NetworkModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Executors;

import javax.inject.Named;
import javax.inject.Singleton;
Expand Down Expand Up @@ -104,14 +104,18 @@ static AgentConnection.Factory provideAgentConnectionFactory(
@Provides
@Singleton
static AgentClient.Factory provideAgentClientFactory(
Vertx vertx,
Gson gson,
@Named(HttpModule.HTTP_REQUEST_TIMEOUT_SECONDS) long httpTimeout,
WebClient webClient,
CredentialsManager credentialsManager,
Logger logger) {
return new AgentClient.Factory(
vertx, gson, httpTimeout, webClient, credentialsManager, logger);
Executors.newCachedThreadPool(),
gson,
httpTimeout,
webClient,
credentialsManager,
logger);
}

@Provides
Expand All @@ -127,7 +131,7 @@ static TargetConnectionManager provideTargetConnectionManager(
connectionToolkit,
agentConnectionFactory,
storage,
ForkJoinPool.commonPool(),
Executors.newCachedThreadPool(),
Scheduler.systemScheduler(),
maxTargetTtl,
maxTargetConnections,
Expand All @@ -144,7 +148,11 @@ static JFRConnectionToolkit provideJFRConnectionToolkit(
@Provides
@Singleton
static Vertx provideVertx() {
return Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));
VertxOptions defaults = new VertxOptions();
return Vertx.vertx(
new VertxOptions()
.setPreferNativeTransport(true)
.setEventLoopPoolSize(defaults.getEventLoopPoolSize() + 4));
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -94,8 +93,7 @@ public boolean isAvailable() {
private boolean testDockerApi() {
CompletableFuture<Boolean> result = new CompletableFuture<>();
URI requestPath = URI.create("http://d/v1.41/info");
ForkJoinPool.commonPool()
.submit(
new Thread(
() -> {
webClient
.get()
Expand All @@ -117,7 +115,8 @@ private boolean testDockerApi() {
}
result.complete(true);
});
});
})
.start();
try {
return result.get(2, TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException | ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -94,8 +94,7 @@ public boolean isAvailable() {
private boolean testPodmanApi() {
CompletableFuture<Boolean> result = new CompletableFuture<>();
URI requestPath = URI.create("http://d/info");
ForkJoinPool.commonPool()
.submit(
new Thread(
() -> {
webClient
.get()
Expand All @@ -117,7 +116,8 @@ private boolean testPodmanApi() {
}
result.complete(true);
});
});
})
.start();
try {
return result.get(2, TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException | ExecutionException e) {
Expand All @@ -130,7 +130,7 @@ private boolean testPodmanApi() {
public PodmanPlatformClient getPlatformClient() {
logger.info("Selected {} Strategy", getClass().getSimpleName());
return new PodmanPlatformClient(
ForkJoinPool.commonPool(),
Executors.newSingleThreadExecutor(),
webClient,
vertx,
getSocket(),
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/cryostat/recordings/RecordingsModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Executors;

import javax.inject.Named;
import javax.inject.Provider;
Expand Down Expand Up @@ -177,7 +177,7 @@ static RecordingMetadataManager provideRecordingMetadataManager(
PosixFilePermission.OWNER_EXECUTE)));
}
return new RecordingMetadataManager(
ForkJoinPool.commonPool(),
Executors.newSingleThreadExecutor(),
metadataDir,
archivedRecordingsPath,
connectionTimeoutSeconds,
Expand Down Expand Up @@ -210,7 +210,7 @@ static JvmIdHelper provideJvmIdHelper(
credentialsManager,
storage,
connectionTimeoutSeconds,
ForkJoinPool.commonPool(),
Executors.newCachedThreadPool(),
Scheduler.systemScheduler(),
base32,
logger);
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/io/cryostat/discovery/DiscoveryStorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import javax.inject.Singleton;

import io.cryostat.DirectExecutorService;
import io.cryostat.MainModule;
import io.cryostat.MockVertx;
import io.cryostat.VerticleDeployer;
Expand Down Expand Up @@ -106,6 +107,7 @@ void setup() {
this.storage =
new DiscoveryStorage(
deployer,
new DirectExecutorService(),
Duration.ofMinutes(5),
() -> builtin,
dao,
Expand Down

0 comments on commit 93f1089

Please sign in to comment.