From b6efb9273e6644e5fca63ff9b87142e0bf321648 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Sat, 4 May 2024 01:10:42 -0400 Subject: [PATCH 1/2] fix(graphql): correct use of nonblocking operations, re-enable nonblocking execution --- .../io/cryostat/graphql/ActiveRecordings.java | 272 +++++++++--------- src/main/resources/application.properties | 2 +- 2 files changed, 135 insertions(+), 139 deletions(-) diff --git a/src/main/java/io/cryostat/graphql/ActiveRecordings.java b/src/main/java/io/cryostat/graphql/ActiveRecordings.java index a9877ee20..900ce0879 100644 --- a/src/main/java/io/cryostat/graphql/ActiveRecordings.java +++ b/src/main/java/io/cryostat/graphql/ActiveRecordings.java @@ -44,6 +44,9 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.smallrye.common.annotation.Blocking; import io.smallrye.graphql.api.Nullable; +import io.smallrye.graphql.execution.ExecutionException; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; import jakarta.inject.Inject; import jakarta.transaction.Transactional; import jdk.jfr.RecordingState; @@ -64,209 +67,202 @@ public class ActiveRecordings { @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) Duration timeout; - @Blocking @Transactional @Mutation @Description( "Start a new Flight Recording on all Targets under the subtrees of the discovery nodes" + " matching the given filter") - public List createRecording( + public Multi createRecording( @NonNull DiscoveryNodeFilter nodes, @NonNull RecordingSettings recording) throws QuantityConversionException { - var list = - DiscoveryNode.listAll().stream() - .filter(nodes) - .flatMap( - node -> - RootNode.recurseChildren(node, n -> n.target != null) - .stream() - .map(n -> n.target)) - .toList(); - var recordings = new ArrayList(); - for (var t : list) { - var template = - recordingHelper.getPreferredTemplate( - t, recording.template, TemplateType.valueOf(recording.templateType)); - var r = - recordingHelper - .startRecording( - t, - Optional.ofNullable(recording.replace) - .map(RecordingReplace::valueOf) - .orElse(RecordingReplace.STOPPED), - template, - recording.asOptions(), - Optional.ofNullable(recording.metadata) - .map(s -> s.labels) - .orElse(Map.of())) - .await() - .atMost(Duration.ofSeconds(10)); - recordings.add(r); - } - return recordings; + return Multi.createFrom() + .items(DiscoveryNode.listAll().stream()) + .filter(nodes) + .onItem() + .transformToIterable( + node -> + RootNode.recurseChildren(node, n -> n.target != null).stream() + .map(n -> n.target) + .toList()) + .onItem() + .transformToUniAndMerge( + t -> { + var template = + recordingHelper.getPreferredTemplate( + t, + recording.template, + TemplateType.valueOf(recording.templateType)); + try { + return recordingHelper.startRecording( + t, + Optional.ofNullable(recording.replace) + .map(RecordingReplace::valueOf) + .orElse(RecordingReplace.STOPPED), + template, + recording.asOptions(), + Optional.ofNullable(recording.metadata) + .map(s -> s.labels) + .orElse(Map.of())); + } catch (QuantityConversionException e) { + logger.warn(e); + throw new ExecutionException(e); + } + }); } - @Blocking @Transactional @Mutation @Description( "Archive an existing Flight Recording matching the given filter, on all Targets under" + " the subtrees of the discovery nodes matching the given filter") - public List archiveRecording( + public Multi archiveRecording( @NonNull DiscoveryNodeFilter nodes, @Nullable ActiveRecordingsFilter recordings) throws Exception { - var list = - DiscoveryNode.listAll().stream() - .filter(nodes) - .flatMap( - node -> - RootNode.recurseChildren(node, n -> n.target != null) - .stream() - .map(n -> n.target)) - .flatMap( - t -> - recordingHelper.listActiveRecordings(t).stream() - .filter( - r -> - recordings == null - || recordings.test(r))) - .toList(); - var archives = new ArrayList(); - for (var r : list) { - archives.add(recordingHelper.archiveRecording(r, null, null)); - } - return archives; + return Multi.createFrom() + .items(DiscoveryNode.listAll().stream()) + .filter(nodes) + .onItem() + .transformToIterable( + node -> + RootNode.recurseChildren(node, n -> n.target != null).stream() + .map(n -> n.target) + .toList()) + .onItem() + .transformToIterable( + t -> + recordingHelper.listActiveRecordings(t).stream() + .filter(r -> recordings == null || recordings.test(r)) + .toList()) + .onItem() + .transform( + r -> { + try { + return recordingHelper.archiveRecording(r, null, null); + } catch (Exception e) { + logger.warn(e); + throw new ExecutionException(e); + } + }); } - @Blocking @Transactional @Mutation @Description( "Stop an existing Flight Recording matching the given filter, on all Targets under" + " the subtrees of the discovery nodes matching the given filter") - public List stopRecording( + public Multi stopRecording( @NonNull DiscoveryNodeFilter nodes, @Nullable ActiveRecordingsFilter recordings) throws Exception { - var list = - DiscoveryNode.listAll().stream() - .filter(nodes) - .flatMap( - node -> - RootNode.recurseChildren(node, n -> n.target != null) - .stream() - .map(n -> n.target)) - .flatMap( - t -> - recordingHelper.listActiveRecordings(t).stream() - .filter( - r -> - recordings == null - || recordings.test(r))) - .toList(); - for (var r : list) { - recordingHelper.stopRecording(r).await().atMost(Duration.ofSeconds(10)); - } - return list; + return Multi.createFrom() + .items(DiscoveryNode.listAll().stream()) + .filter(nodes) + .onItem() + .transformToIterable( + node -> + RootNode.recurseChildren(node, n -> n.target != null).stream() + .map(n -> n.target) + .toList()) + .onItem() + .transformToIterable( + t -> + recordingHelper.listActiveRecordings(t).stream() + .filter(r -> recordings == null || recordings.test(r)) + .toList()) + .onItem() + .transformToUniAndMerge( + r -> { + try { + return recordingHelper.stopRecording(r); + } catch (Exception e) { + logger.warn(e); + throw new ExecutionException(e); + } + }); } - @Blocking @Transactional @Mutation @Description( "Delete an existing Flight Recording matching the given filter, on all Targets under" + " the subtrees of the discovery nodes matching the given filter") - public List deleteRecording( + public Multi deleteRecording( @NonNull DiscoveryNodeFilter nodes, @Nullable ActiveRecordingsFilter recordings) { - var list = - DiscoveryNode.listAll().stream() - .filter(nodes) - .flatMap( - node -> - RootNode.recurseChildren(node, n -> n.target != null) - .stream() - .map(n -> n.target)) - .flatMap( - t -> - recordingHelper.listActiveRecordings(t).stream() - .filter( - r -> - recordings == null - || recordings.test(r))) - .toList(); - for (var r : list) { - recordingHelper.deleteRecording(r).await().atMost(Duration.ofSeconds(10)); - } - return list; + return Multi.createFrom() + .items(DiscoveryNode.listAll().stream()) + .filter(nodes) + .onItem() + .transformToIterable( + node -> + RootNode.recurseChildren(node, n -> n.target != null).stream() + .map(n -> n.target) + .toList()) + .onItem() + .transformToIterable( + t -> + recordingHelper.listActiveRecordings(t).stream() + .filter(r -> recordings == null || recordings.test(r)) + .toList()) + .onItem() + .transformToUniAndMerge(recordingHelper::deleteRecording); } - @Blocking @Transactional @Mutation @Description( "Create a Flight Recorder Snapshot on all Targets under" + " the subtrees of the discovery nodes matching the given filter") - public List createSnapshot(@NonNull DiscoveryNodeFilter nodes) { - var targets = - DiscoveryNode.listAll().stream() - .filter(nodes) - .flatMap( - node -> - RootNode.recurseChildren(node, n -> n.target != null) - .stream() - .map(n -> n.target)) - .toList(); - var snapshots = new ArrayList(); - for (var t : targets) { - snapshots.add(recordingHelper.createSnapshot(t).await().atMost(Duration.ofSeconds(10))); - } - return snapshots; + public Multi createSnapshot(@NonNull DiscoveryNodeFilter nodes) { + return Multi.createFrom() + .items(DiscoveryNode.listAll().stream()) + .filter(nodes) + .onItem() + .transformToIterable( + node -> + RootNode.recurseChildren(node, n -> n.target != null).stream() + .map(n -> n.target) + .toList()) + .onItem() + .transformToUniAndMerge(recordingHelper::createSnapshot); } - @Blocking @Transactional @Description("Start a new Flight Recording on the specified Target") - public ActiveRecording doStartRecording( + public Uni doStartRecording( @Source Target target, @NonNull RecordingSettings recording) throws QuantityConversionException { var fTarget = Target.getTargetById(target.id); Template template = recordingHelper.getPreferredTemplate( fTarget, recording.template, TemplateType.valueOf(recording.templateType)); - return recordingHelper - .startRecording( - fTarget, - Optional.ofNullable(recording.replace) - .map(RecordingReplace::valueOf) - .orElse(RecordingReplace.STOPPED), - template, - recording.asOptions(), - Optional.ofNullable(recording.metadata).map(s -> s.labels).orElse(Map.of())) - .await() - .atMost(timeout); + return recordingHelper.startRecording( + fTarget, + Optional.ofNullable(recording.replace) + .map(RecordingReplace::valueOf) + .orElse(RecordingReplace.STOPPED), + template, + recording.asOptions(), + Optional.ofNullable(recording.metadata).map(s -> s.labels).orElse(Map.of())); } - @Blocking @Transactional @Description("Create a new Flight Recorder Snapshot on the specified Target") - public ActiveRecording doSnapshot(@Source Target target) { + public Uni doSnapshot(@Source Target target) { var fTarget = Target.getTargetById(target.id); - return recordingHelper.createSnapshot(fTarget).await().atMost(timeout); + return recordingHelper.createSnapshot(fTarget); } - @Blocking @Transactional @Description("Stop the specified Flight Recording") - public ActiveRecording doStop(@Source ActiveRecording recording) throws Exception { + public Uni doStop(@Source ActiveRecording recording) throws Exception { var ar = ActiveRecording.find("id", recording.id).singleResult(); - return recordingHelper.stopRecording(ar).await().atMost(timeout); + return recordingHelper.stopRecording(ar); } - @Blocking @Transactional @Description("Delete the specified Flight Recording") - public ActiveRecording doDelete(@Source ActiveRecording recording) { + public Uni doDelete(@Source ActiveRecording recording) { var ar = ActiveRecording.find("id", recording.id).singleResult(); - return recordingHelper.deleteRecording(ar).await().atMost(timeout); + return recordingHelper.deleteRecording(ar); } @Blocking diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index adef23923..6fe8b9f74 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -64,7 +64,7 @@ quarkus.smallrye-openapi.info-license-name=Apache 2.0 quarkus.smallrye-openapi.info-license-url=https://github.com/cryostatio/cryostat3/blob/main/LICENSE quarkus.smallrye-graphql.events.enabled=true -quarkus.smallrye-graphql.nonblocking.enabled=false +quarkus.smallrye-graphql.nonblocking.enabled=true quarkus.smallrye-graphql.root-path=/api/v3/graphql quarkus.smallrye-graphql.http.get.enabled=true quarkus.smallrye-graphql.print-data-fetcher-exception=true From 6dead265f4289ce2bf82b0e3d6cad4c99d240b0c Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Sat, 4 May 2024 08:46:50 -0400 Subject: [PATCH 2/2] return Unis, not Multis --- .../io/cryostat/graphql/ActiveRecordings.java | 39 ++++++++++++++----- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/cryostat/graphql/ActiveRecordings.java b/src/main/java/io/cryostat/graphql/ActiveRecordings.java index 900ce0879..f0fc3ff39 100644 --- a/src/main/java/io/cryostat/graphql/ActiveRecordings.java +++ b/src/main/java/io/cryostat/graphql/ActiveRecordings.java @@ -68,11 +68,12 @@ public class ActiveRecordings { Duration timeout; @Transactional + @Blocking @Mutation @Description( "Start a new Flight Recording on all Targets under the subtrees of the discovery nodes" + " matching the given filter") - public Multi createRecording( + public Uni> createRecording( @NonNull DiscoveryNodeFilter nodes, @NonNull RecordingSettings recording) throws QuantityConversionException { return Multi.createFrom() @@ -107,15 +108,18 @@ public Multi createRecording( logger.warn(e); throw new ExecutionException(e); } - }); + }) + .collect() + .asList(); } @Transactional + @Blocking @Mutation @Description( "Archive an existing Flight Recording matching the given filter, on all Targets under" + " the subtrees of the discovery nodes matching the given filter") - public Multi archiveRecording( + public Uni> archiveRecording( @NonNull DiscoveryNodeFilter nodes, @Nullable ActiveRecordingsFilter recordings) throws Exception { return Multi.createFrom() @@ -142,15 +146,18 @@ public Multi archiveRecording( logger.warn(e); throw new ExecutionException(e); } - }); + }) + .collect() + .asList(); } @Transactional + @Blocking @Mutation @Description( "Stop an existing Flight Recording matching the given filter, on all Targets under" + " the subtrees of the discovery nodes matching the given filter") - public Multi stopRecording( + public Uni> stopRecording( @NonNull DiscoveryNodeFilter nodes, @Nullable ActiveRecordingsFilter recordings) throws Exception { return Multi.createFrom() @@ -177,15 +184,18 @@ public Multi stopRecording( logger.warn(e); throw new ExecutionException(e); } - }); + }) + .collect() + .asList(); } @Transactional + @Blocking @Mutation @Description( "Delete an existing Flight Recording matching the given filter, on all Targets under" + " the subtrees of the discovery nodes matching the given filter") - public Multi deleteRecording( + public Uni> deleteRecording( @NonNull DiscoveryNodeFilter nodes, @Nullable ActiveRecordingsFilter recordings) { return Multi.createFrom() .items(DiscoveryNode.listAll().stream()) @@ -203,15 +213,18 @@ public Multi deleteRecording( .filter(r -> recordings == null || recordings.test(r)) .toList()) .onItem() - .transformToUniAndMerge(recordingHelper::deleteRecording); + .transformToUniAndMerge(recordingHelper::deleteRecording) + .collect() + .asList(); } @Transactional + @Blocking @Mutation @Description( "Create a Flight Recorder Snapshot on all Targets under" + " the subtrees of the discovery nodes matching the given filter") - public Multi createSnapshot(@NonNull DiscoveryNodeFilter nodes) { + public Uni> createSnapshot(@NonNull DiscoveryNodeFilter nodes) { return Multi.createFrom() .items(DiscoveryNode.listAll().stream()) .filter(nodes) @@ -222,10 +235,13 @@ public Multi createSnapshot(@NonNull DiscoveryNodeFilter nodes) .map(n -> n.target) .toList()) .onItem() - .transformToUniAndMerge(recordingHelper::createSnapshot); + .transformToUniAndMerge(recordingHelper::createSnapshot) + .collect() + .asList(); } @Transactional + @Blocking @Description("Start a new Flight Recording on the specified Target") public Uni doStartRecording( @Source Target target, @NonNull RecordingSettings recording) @@ -245,6 +261,7 @@ public Uni doStartRecording( } @Transactional + @Blocking @Description("Create a new Flight Recorder Snapshot on the specified Target") public Uni doSnapshot(@Source Target target) { var fTarget = Target.getTargetById(target.id); @@ -252,6 +269,7 @@ public Uni doSnapshot(@Source Target target) { } @Transactional + @Blocking @Description("Stop the specified Flight Recording") public Uni doStop(@Source ActiveRecording recording) throws Exception { var ar = ActiveRecording.find("id", recording.id).singleResult(); @@ -259,6 +277,7 @@ public Uni doStop(@Source ActiveRecording recording) throws Exc } @Transactional + @Blocking @Description("Delete the specified Flight Recording") public Uni doDelete(@Source ActiveRecording recording) { var ar = ActiveRecording.find("id", recording.id).singleResult();