Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(graphql): correct use of nonblocking operations, re-enable nonblocking execution #434

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 153 additions & 138 deletions src/main/java/io/cryostat/graphql/ActiveRecordings.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.graphql.api.Nullable;
import io.smallrye.graphql.execution.ExecutionException;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import jdk.jfr.RecordingState;
Expand All @@ -64,209 +67,221 @@ public class ActiveRecordings {
@ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT)
Duration timeout;

@Blocking
@Transactional
@Blocking
@Mutation
@Description(
"Start a new Flight Recording on all Targets under the subtrees of the discovery nodes"
+ " matching the given filter")
public List<ActiveRecording> createRecording(
public Uni<List<ActiveRecording>> createRecording(
@NonNull DiscoveryNodeFilter nodes, @NonNull RecordingSettings recording)
throws QuantityConversionException {
var list =
DiscoveryNode.<DiscoveryNode>listAll().stream()
.filter(nodes)
.flatMap(
node ->
RootNode.recurseChildren(node, n -> n.target != null)
.stream()
.map(n -> n.target))
.toList();
var recordings = new ArrayList<ActiveRecording>();
for (var t : list) {
var template =
recordingHelper.getPreferredTemplate(
t, recording.template, TemplateType.valueOf(recording.templateType));
var r =
recordingHelper
.startRecording(
t,
Optional.ofNullable(recording.replace)
.map(RecordingReplace::valueOf)
.orElse(RecordingReplace.STOPPED),
template,
recording.asOptions(),
Optional.ofNullable(recording.metadata)
.map(s -> s.labels)
.orElse(Map.of()))
.await()
.atMost(Duration.ofSeconds(10));
recordings.add(r);
}
return recordings;
return Multi.createFrom()
.items(DiscoveryNode.<DiscoveryNode>listAll().stream())
.filter(nodes)
.onItem()
.transformToIterable(
node ->
RootNode.recurseChildren(node, n -> n.target != null).stream()
.map(n -> n.target)
.toList())
.onItem()
.transformToUniAndMerge(
t -> {
var template =
recordingHelper.getPreferredTemplate(
t,
recording.template,
TemplateType.valueOf(recording.templateType));
try {
return recordingHelper.startRecording(
t,
Optional.ofNullable(recording.replace)
.map(RecordingReplace::valueOf)
.orElse(RecordingReplace.STOPPED),
template,
recording.asOptions(),
Optional.ofNullable(recording.metadata)
.map(s -> s.labels)
.orElse(Map.of()));
} catch (QuantityConversionException e) {
logger.warn(e);
throw new ExecutionException(e);
}
})
.collect()
.asList();
}

@Blocking
@Transactional
@Blocking
@Mutation
@Description(
"Archive an existing Flight Recording matching the given filter, on all Targets under"
+ " the subtrees of the discovery nodes matching the given filter")
public List<ArchivedRecording> archiveRecording(
public Uni<List<ArchivedRecording>> archiveRecording(
@NonNull DiscoveryNodeFilter nodes, @Nullable ActiveRecordingsFilter recordings)
throws Exception {
var list =
DiscoveryNode.<DiscoveryNode>listAll().stream()
.filter(nodes)
.flatMap(
node ->
RootNode.recurseChildren(node, n -> n.target != null)
.stream()
.map(n -> n.target))
.flatMap(
t ->
recordingHelper.listActiveRecordings(t).stream()
.filter(
r ->
recordings == null
|| recordings.test(r)))
.toList();
var archives = new ArrayList<ArchivedRecording>();
for (var r : list) {
archives.add(recordingHelper.archiveRecording(r, null, null));
}
return archives;
return Multi.createFrom()
.items(DiscoveryNode.<DiscoveryNode>listAll().stream())
.filter(nodes)
.onItem()
.transformToIterable(
node ->
RootNode.recurseChildren(node, n -> n.target != null).stream()
.map(n -> n.target)
.toList())
.onItem()
.transformToIterable(
t ->
recordingHelper.listActiveRecordings(t).stream()
.filter(r -> recordings == null || recordings.test(r))
.toList())
.onItem()
.transform(
r -> {
try {
return recordingHelper.archiveRecording(r, null, null);
} catch (Exception e) {
logger.warn(e);
throw new ExecutionException(e);
}
})
.collect()
.asList();
}

@Blocking
@Transactional
@Blocking
@Mutation
@Description(
"Stop an existing Flight Recording matching the given filter, on all Targets under"
+ " the subtrees of the discovery nodes matching the given filter")
public List<ActiveRecording> stopRecording(
public Uni<List<ActiveRecording>> stopRecording(
@NonNull DiscoveryNodeFilter nodes, @Nullable ActiveRecordingsFilter recordings)
throws Exception {
var list =
DiscoveryNode.<DiscoveryNode>listAll().stream()
.filter(nodes)
.flatMap(
node ->
RootNode.recurseChildren(node, n -> n.target != null)
.stream()
.map(n -> n.target))
.flatMap(
t ->
recordingHelper.listActiveRecordings(t).stream()
.filter(
r ->
recordings == null
|| recordings.test(r)))
.toList();
for (var r : list) {
recordingHelper.stopRecording(r).await().atMost(Duration.ofSeconds(10));
}
return list;
return Multi.createFrom()
.items(DiscoveryNode.<DiscoveryNode>listAll().stream())
.filter(nodes)
.onItem()
.transformToIterable(
node ->
RootNode.recurseChildren(node, n -> n.target != null).stream()
.map(n -> n.target)
.toList())
.onItem()
.transformToIterable(
t ->
recordingHelper.listActiveRecordings(t).stream()
.filter(r -> recordings == null || recordings.test(r))
.toList())
.onItem()
.transformToUniAndMerge(
r -> {
try {
return recordingHelper.stopRecording(r);
} catch (Exception e) {
logger.warn(e);
throw new ExecutionException(e);
}
})
.collect()
.asList();
}

@Blocking
@Transactional
@Blocking
@Mutation
@Description(
"Delete an existing Flight Recording matching the given filter, on all Targets under"
+ " the subtrees of the discovery nodes matching the given filter")
public List<ActiveRecording> deleteRecording(
public Uni<List<ActiveRecording>> deleteRecording(
@NonNull DiscoveryNodeFilter nodes, @Nullable ActiveRecordingsFilter recordings) {
var list =
DiscoveryNode.<DiscoveryNode>listAll().stream()
.filter(nodes)
.flatMap(
node ->
RootNode.recurseChildren(node, n -> n.target != null)
.stream()
.map(n -> n.target))
.flatMap(
t ->
recordingHelper.listActiveRecordings(t).stream()
.filter(
r ->
recordings == null
|| recordings.test(r)))
.toList();
for (var r : list) {
recordingHelper.deleteRecording(r).await().atMost(Duration.ofSeconds(10));
}
return list;
return Multi.createFrom()
.items(DiscoveryNode.<DiscoveryNode>listAll().stream())
.filter(nodes)
.onItem()
.transformToIterable(
node ->
RootNode.recurseChildren(node, n -> n.target != null).stream()
.map(n -> n.target)
.toList())
.onItem()
.transformToIterable(
t ->
recordingHelper.listActiveRecordings(t).stream()
.filter(r -> recordings == null || recordings.test(r))
.toList())
.onItem()
.transformToUniAndMerge(recordingHelper::deleteRecording)
.collect()
.asList();
}

@Blocking
@Transactional
@Blocking
@Mutation
@Description(
"Create a Flight Recorder Snapshot on all Targets under"
+ " the subtrees of the discovery nodes matching the given filter")
public List<ActiveRecording> createSnapshot(@NonNull DiscoveryNodeFilter nodes) {
var targets =
DiscoveryNode.<DiscoveryNode>listAll().stream()
.filter(nodes)
.flatMap(
node ->
RootNode.recurseChildren(node, n -> n.target != null)
.stream()
.map(n -> n.target))
.toList();
var snapshots = new ArrayList<ActiveRecording>();
for (var t : targets) {
snapshots.add(recordingHelper.createSnapshot(t).await().atMost(Duration.ofSeconds(10)));
}
return snapshots;
public Uni<List<ActiveRecording>> createSnapshot(@NonNull DiscoveryNodeFilter nodes) {
return Multi.createFrom()
.items(DiscoveryNode.<DiscoveryNode>listAll().stream())
.filter(nodes)
.onItem()
.transformToIterable(
node ->
RootNode.recurseChildren(node, n -> n.target != null).stream()
.map(n -> n.target)
.toList())
.onItem()
.transformToUniAndMerge(recordingHelper::createSnapshot)
.collect()
.asList();
}

@Blocking
@Transactional
@Blocking
@Description("Start a new Flight Recording on the specified Target")
public ActiveRecording doStartRecording(
public Uni<ActiveRecording> doStartRecording(
@Source Target target, @NonNull RecordingSettings recording)
throws QuantityConversionException {
var fTarget = Target.getTargetById(target.id);
Template template =
recordingHelper.getPreferredTemplate(
fTarget, recording.template, TemplateType.valueOf(recording.templateType));
return recordingHelper
.startRecording(
fTarget,
Optional.ofNullable(recording.replace)
.map(RecordingReplace::valueOf)
.orElse(RecordingReplace.STOPPED),
template,
recording.asOptions(),
Optional.ofNullable(recording.metadata).map(s -> s.labels).orElse(Map.of()))
.await()
.atMost(timeout);
return recordingHelper.startRecording(
fTarget,
Optional.ofNullable(recording.replace)
.map(RecordingReplace::valueOf)
.orElse(RecordingReplace.STOPPED),
template,
recording.asOptions(),
Optional.ofNullable(recording.metadata).map(s -> s.labels).orElse(Map.of()));
}

@Blocking
@Transactional
@Blocking
@Description("Create a new Flight Recorder Snapshot on the specified Target")
public ActiveRecording doSnapshot(@Source Target target) {
public Uni<ActiveRecording> doSnapshot(@Source Target target) {
var fTarget = Target.getTargetById(target.id);
return recordingHelper.createSnapshot(fTarget).await().atMost(timeout);
return recordingHelper.createSnapshot(fTarget);
}

@Blocking
@Transactional
@Blocking
@Description("Stop the specified Flight Recording")
public ActiveRecording doStop(@Source ActiveRecording recording) throws Exception {
public Uni<ActiveRecording> doStop(@Source ActiveRecording recording) throws Exception {
var ar = ActiveRecording.<ActiveRecording>find("id", recording.id).singleResult();
return recordingHelper.stopRecording(ar).await().atMost(timeout);
return recordingHelper.stopRecording(ar);
}

@Blocking
@Transactional
@Blocking
@Description("Delete the specified Flight Recording")
public ActiveRecording doDelete(@Source ActiveRecording recording) {
public Uni<ActiveRecording> doDelete(@Source ActiveRecording recording) {
var ar = ActiveRecording.<ActiveRecording>find("id", recording.id).singleResult();
return recordingHelper.deleteRecording(ar).await().atMost(timeout);
return recordingHelper.deleteRecording(ar);
}

@Blocking
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ quarkus.smallrye-openapi.info-license-name=Apache 2.0
quarkus.smallrye-openapi.info-license-url=https://github.com/cryostatio/cryostat3/blob/main/LICENSE

quarkus.smallrye-graphql.events.enabled=true
quarkus.smallrye-graphql.nonblocking.enabled=false
quarkus.smallrye-graphql.nonblocking.enabled=true
quarkus.smallrye-graphql.root-path=/api/v3/graphql
quarkus.smallrye-graphql.http.get.enabled=true
quarkus.smallrye-graphql.print-data-fetcher-exception=true
Expand Down
Loading