From 34db1f17d67e8ac014b6ff92bb31bd04fbd49579 Mon Sep 17 00:00:00 2001 From: jmatsuok Date: Tue, 29 Oct 2024 10:18:03 -0400 Subject: [PATCH 01/12] Add prototype for long running API calls for Archived Recordings --- schema/openapi.yaml | 6 +- src/main/java/io/cryostat/Producers.java | 10 +++ .../cryostat/recordings/ActiveRecordings.java | 46 +++++++--- .../recordings/ArchiveRequestGenerator.java | 86 +++++++++++++++++++ .../cryostat/recordings/RecordingHelper.java | 3 +- 5 files changed, 135 insertions(+), 16 deletions(-) create mode 100644 src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java diff --git a/schema/openapi.yaml b/schema/openapi.yaml index 7ccd335f9..cb979d0bb 100644 --- a/schema/openapi.yaml +++ b/schema/openapi.yaml @@ -2131,7 +2131,7 @@ paths: schema: type: string responses: - "200": + "202": content: text/plain: schema: @@ -2141,6 +2141,10 @@ paths: description: Not Authorized "403": description: Not Allowed + "404": + description: Not Found + "502": + description: Target not Reachable security: - SecurityScheme: [] tags: diff --git a/src/main/java/io/cryostat/Producers.java b/src/main/java/io/cryostat/Producers.java index 035836753..8b94dcd0b 100644 --- a/src/main/java/io/cryostat/Producers.java +++ b/src/main/java/io/cryostat/Producers.java @@ -22,6 +22,7 @@ import io.cryostat.core.reports.InterruptibleReportGenerator; import io.cryostat.libcryostat.sys.Clock; import io.cryostat.libcryostat.sys.FileSystem; +import io.cryostat.recordings.ArchiveRequestGenerator; import io.quarkus.arc.DefaultBean; import io.vertx.mutiny.core.Vertx; @@ -76,6 +77,15 @@ public static InterruptibleReportGenerator produceInterruptibleReportGenerator() singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool()); } + @Produces + @RequestScoped + @DefaultBean + public static ArchiveRequestGenerator produceArchiveRequestGenerator() { + boolean singleThread = Runtime.getRuntime().availableProcessors() < 2; + return new ArchiveRequestGenerator( + singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool()); + } + @Produces @DefaultBean public WebClient produceWebClient(Vertx vertx) { diff --git a/src/main/java/io/cryostat/recordings/ActiveRecordings.java b/src/main/java/io/cryostat/recordings/ActiveRecordings.java index 99520b4bc..ccaee0037 100644 --- a/src/main/java/io/cryostat/recordings/ActiveRecordings.java +++ b/src/main/java/io/cryostat/recordings/ActiveRecordings.java @@ -15,7 +15,6 @@ */ package io.cryostat.recordings; -import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.time.Duration; @@ -25,10 +24,12 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import io.cryostat.ConfigProperties; import io.cryostat.libcryostat.templates.Template; import io.cryostat.libcryostat.templates.TemplateType; +import io.cryostat.recordings.ArchiveRequestGenerator.ArchiveRequest; import io.cryostat.recordings.RecordingHelper.RecordingOptions; import io.cryostat.recordings.RecordingHelper.RecordingReplace; import io.cryostat.targets.Target; @@ -37,6 +38,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Uni; +import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.annotation.security.RolesAllowed; import jakarta.inject.Inject; import jakarta.transaction.Transactional; @@ -64,6 +66,8 @@ public class ActiveRecordings { @Inject ObjectMapper mapper; @Inject RecordingHelper recordingHelper; + @Inject ArchiveRequestGenerator generator; + @Inject EventBus bus; @Inject Logger logger; @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) @@ -100,7 +104,7 @@ public RestResponse download(@RestPath long targetId, @RestPath lon @Blocking @Path("/{remoteId}") @RolesAllowed("write") - public String patch(@RestPath long targetId, @RestPath long remoteId, String body) + public Uni patch(@RestPath long targetId, @RestPath long remoteId, String body) throws Exception { Target target = Target.find("id", targetId).singleResult(); Optional recording = @@ -119,18 +123,32 @@ public String patch(@RestPath long targetId, @RestPath long remoteId, String bod .atMost(connectionFailedTimeout); return null; case "save": - try { - // FIXME this operation might take a long time to complete, depending on the - // amount of JFR data in the target and the speed of the connection between the - // target and Cryostat. We should not make the client wait until this operation - // completes before sending a response - it should be async. Here we should just - // return an Accepted response, and if a failure occurs that should be indicated - // as a websocket notification. - return recordingHelper.archiveRecording(activeRecording, null, null).name(); - } catch (IOException ioe) { - logger.warn(ioe); - return null; - } + // FIXME this operation might take a long time to complete, depending on the + // amount of JFR data in the target and the speed of the connection between the + // target and Cryostat. We should not make the client wait until this operation + // completes before sending a response - it should be async. Here we should just + // return an Accepted response, and if a failure occurs that should be indicated + // as a websocket notification. + + /* + * Desired workflow: + * Client sends a PATCH request to Cryostat + * Cryostat receives the PATCH and checks that the specified active recording exists and that the target JVM is reachable (ex. try to open a connection and do something relatively lightweight like compute its JVM ID). If this check succeeds respond to the PATCH with 202, if it fails respond with a 404 (recording not found) or 502 (target not reachable) etc. + * In the background, Cryostat creates the S3 file upload request, opens a target connection, pipes the bytes, etc. - same as steps 2-5 above + * Cryostat emits a WebSocket notification, either indicating task successful completion or task failure. + */ + logger.info("Ceating request"); + ArchiveRequest request = + new ArchiveRequest(UUID.randomUUID().toString(), activeRecording); + logger.info( + "Request created: (" + + request.getId() + + ", " + + request.getRecording().name + + ")"); + return Uni.createFrom() + .future(generator.performArchive(request, bus, recordingHelper)); + // return recordingHelper.archiveRecording(activeRecording, null, null).name(); default: throw new BadRequestException(body); } diff --git a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java new file mode 100644 index 000000000..1955708f3 --- /dev/null +++ b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java @@ -0,0 +1,86 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.recordings; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import io.cryostat.ws.MessagingServer; +import io.cryostat.ws.Notification; + +import io.vertx.mutiny.core.eventbus.EventBus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ArchiveRequestGenerator { + + private static final String ARCHIVE_RECORDING_SUCCESS = "ArchiveRecordingSuccess"; + private static final String ARCHIVE_RECORDING_FAIL = "ArchiveRecordingFailed"; + private final ExecutorService archiveThread = Executors.newCachedThreadPool(); + private final ExecutorService executor; + private final Logger logger = LoggerFactory.getLogger(getClass()); + + public ArchiveRequestGenerator(ExecutorService executor) { + this.executor = executor; + } + + public Future performArchive( + ArchiveRequest request, EventBus bus, RecordingHelper recordingHelper) { + Objects.requireNonNull(request.getRecording()); + return archiveThread.submit( + () -> { + logger.debug("Job ID: " + request.getId() + " submitted."); + try { + recordingHelper.archiveRecording(request.getRecording(), null, null).name(); + bus.publish( + MessagingServer.class.getName(), + new Notification( + ARCHIVE_RECORDING_SUCCESS, + Map.of("jobId", request.getId()))); + return request.getId(); + } catch (Exception e) { + bus.publish( + MessagingServer.class.getName(), + new Notification( + ARCHIVE_RECORDING_FAIL, Map.of("jobId", request.getId()))); + throw new CompletionException(e); + } + }); + } + + public static class ArchiveRequest { + + private String id; + private ActiveRecording recording; + + public ArchiveRequest(String id, ActiveRecording recording) { + this.id = id; + this.recording = recording; + } + + public String getId() { + return id; + } + + public ActiveRecording getRecording() { + return recording; + } + } +} diff --git a/src/main/java/io/cryostat/recordings/RecordingHelper.java b/src/main/java/io/cryostat/recordings/RecordingHelper.java index dd9912491..c98f1e086 100644 --- a/src/main/java/io/cryostat/recordings/RecordingHelper.java +++ b/src/main/java/io/cryostat/recordings/RecordingHelper.java @@ -905,6 +905,7 @@ public ArchivedRecording archiveRecording( URI connectUrl = recording.target.connectUrl; + /* var event = new ArchivedRecordingEvent( ActiveRecordings.RecordingEventCategory.ARCHIVED_CREATED, @@ -912,7 +913,7 @@ public ArchivedRecording archiveRecording( bus.publish(event.category().category(), event.payload().recording()); bus.publish( MessagingServer.class.getName(), - new Notification(event.category().category(), event.payload())); + new Notification(event.category().category(), event.payload()));*/ } return new ArchivedRecording( recording.target.jvmId, From 4a8857901efd014627ff88738ec4da9e83714e35 Mon Sep 17 00:00:00 2001 From: jmatsuok Date: Wed, 30 Oct 2024 14:17:14 -0400 Subject: [PATCH 02/12] Convert ArchiveRequest to record, add null checks --- .../cryostat/recordings/ArchiveRequestGenerator.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java index 1955708f3..b2b6e2a92 100644 --- a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java +++ b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java @@ -65,14 +65,11 @@ public Future performArchive( }); } - public static class ArchiveRequest { + public record ArchiveRequest(String id, ActiveRecording recording) { - private String id; - private ActiveRecording recording; - - public ArchiveRequest(String id, ActiveRecording recording) { - this.id = id; - this.recording = recording; + public ArchiveRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(recording); } public String getId() { From c81e69c3749752e8acc71b287379d0363c2ce69d Mon Sep 17 00:00:00 2001 From: jmatsuok Date: Wed, 30 Oct 2024 14:23:29 -0400 Subject: [PATCH 03/12] Inject eventbus and recordinghelper to ArchiveRequestGenerator, make it application scoped --- .../java/io/cryostat/recordings/ActiveRecordings.java | 3 +-- .../io/cryostat/recordings/ArchiveRequestGenerator.java | 9 +++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/cryostat/recordings/ActiveRecordings.java b/src/main/java/io/cryostat/recordings/ActiveRecordings.java index ccaee0037..92cdf3cbc 100644 --- a/src/main/java/io/cryostat/recordings/ActiveRecordings.java +++ b/src/main/java/io/cryostat/recordings/ActiveRecordings.java @@ -146,8 +146,7 @@ public Uni patch(@RestPath long targetId, @RestPath long remoteId, Strin + ", " + request.getRecording().name + ")"); - return Uni.createFrom() - .future(generator.performArchive(request, bus, recordingHelper)); + return Uni.createFrom().future(generator.performArchive(request)); // return recordingHelper.archiveRecording(activeRecording, null, null).name(); default: throw new BadRequestException(body); diff --git a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java index b2b6e2a92..3fd44ec24 100644 --- a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java +++ b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java @@ -26,9 +26,12 @@ import io.cryostat.ws.Notification; import io.vertx.mutiny.core.eventbus.EventBus; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@ApplicationScoped public class ArchiveRequestGenerator { private static final String ARCHIVE_RECORDING_SUCCESS = "ArchiveRecordingSuccess"; @@ -37,12 +40,14 @@ public class ArchiveRequestGenerator { private final ExecutorService executor; private final Logger logger = LoggerFactory.getLogger(getClass()); + @Inject private EventBus bus; + @Inject private RecordingHelper recordingHelper; + public ArchiveRequestGenerator(ExecutorService executor) { this.executor = executor; } - public Future performArchive( - ArchiveRequest request, EventBus bus, RecordingHelper recordingHelper) { + public Future performArchive(ArchiveRequest request) { Objects.requireNonNull(request.getRecording()); return archiveThread.submit( () -> { From 7dbbbc2c6bdd37740f5f53628a9f3e86dce803b2 Mon Sep 17 00:00:00 2001 From: jmatsuok Date: Wed, 30 Oct 2024 15:47:44 -0400 Subject: [PATCH 04/12] Emit event from patch handler to be received by ArchiveRequestGenerator, cleanup --- .../cryostat/recordings/ActiveRecordings.java | 6 +++--- .../recordings/ArchiveRequestGenerator.java | 19 +++++++++++++++---- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/cryostat/recordings/ActiveRecordings.java b/src/main/java/io/cryostat/recordings/ActiveRecordings.java index 92cdf3cbc..021cb48f6 100644 --- a/src/main/java/io/cryostat/recordings/ActiveRecordings.java +++ b/src/main/java/io/cryostat/recordings/ActiveRecordings.java @@ -104,7 +104,7 @@ public RestResponse download(@RestPath long targetId, @RestPath lon @Blocking @Path("/{remoteId}") @RolesAllowed("write") - public Uni patch(@RestPath long targetId, @RestPath long remoteId, String body) + public String patch(@RestPath long targetId, @RestPath long remoteId, String body) throws Exception { Target target = Target.find("id", targetId).singleResult(); Optional recording = @@ -146,8 +146,8 @@ public Uni patch(@RestPath long targetId, @RestPath long remoteId, Strin + ", " + request.getRecording().name + ")"); - return Uni.createFrom().future(generator.performArchive(request)); - // return recordingHelper.archiveRecording(activeRecording, null, null).name(); + bus.publish(ArchiveRequestGenerator.ARCHIVE_ADDRESS, request); + return request.getId(); default: throw new BadRequestException(body); } diff --git a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java index 3fd44ec24..ad70c5508 100644 --- a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java +++ b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java @@ -19,12 +19,12 @@ import java.util.Objects; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import io.cryostat.ws.MessagingServer; import io.cryostat.ws.Notification; +import io.quarkus.vertx.ConsumeEvent; import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -34,9 +34,9 @@ @ApplicationScoped public class ArchiveRequestGenerator { + public static final String ARCHIVE_ADDRESS = "io.cryostat.recordings.ArchiveRequestGenerator"; private static final String ARCHIVE_RECORDING_SUCCESS = "ArchiveRecordingSuccess"; private static final String ARCHIVE_RECORDING_FAIL = "ArchiveRecordingFailed"; - private final ExecutorService archiveThread = Executors.newCachedThreadPool(); private final ExecutorService executor; private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -49,11 +49,12 @@ public ArchiveRequestGenerator(ExecutorService executor) { public Future performArchive(ArchiveRequest request) { Objects.requireNonNull(request.getRecording()); - return archiveThread.submit( + return executor.submit( () -> { - logger.debug("Job ID: " + request.getId() + " submitted."); + logger.info("Job ID: " + request.getId() + " submitted."); try { recordingHelper.archiveRecording(request.getRecording(), null, null).name(); + logger.info("Recording archived, firing notification"); bus.publish( MessagingServer.class.getName(), new Notification( @@ -61,6 +62,7 @@ public Future performArchive(ArchiveRequest request) { Map.of("jobId", request.getId()))); return request.getId(); } catch (Exception e) { + logger.info("Archiving failed"); bus.publish( MessagingServer.class.getName(), new Notification( @@ -70,6 +72,15 @@ public Future performArchive(ArchiveRequest request) { }); } + @ConsumeEvent(value = ARCHIVE_ADDRESS) + public void onMessage(ArchiveRequest request) { + try { + performArchive(request); + } catch (Exception e) { + logger.warn("Exception thrown while servicing request: ", e); + } + } + public record ArchiveRequest(String id, ActiveRecording recording) { public ArchiveRequest { From 36446b2bc9f78cb5996bbdeacc1a7c966691f16a Mon Sep 17 00:00:00 2001 From: jmatsuok Date: Tue, 5 Nov 2024 17:31:08 -0500 Subject: [PATCH 05/12] Add Grafana uploads to prototype --- .../cryostat/recordings/ActiveRecordings.java | 21 ++++- .../recordings/ArchiveRequestGenerator.java | 92 +++++++++++++++++++ .../recordings/ArchivedRecordings.java | 15 ++- 3 files changed, 122 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/cryostat/recordings/ActiveRecordings.java b/src/main/java/io/cryostat/recordings/ActiveRecordings.java index 021cb48f6..3424eab68 100644 --- a/src/main/java/io/cryostat/recordings/ActiveRecordings.java +++ b/src/main/java/io/cryostat/recordings/ActiveRecordings.java @@ -30,6 +30,7 @@ import io.cryostat.libcryostat.templates.Template; import io.cryostat.libcryostat.templates.TemplateType; import io.cryostat.recordings.ArchiveRequestGenerator.ArchiveRequest; +import io.cryostat.recordings.ArchiveRequestGenerator.GrafanaActiveUploadRequest; import io.cryostat.recordings.RecordingHelper.RecordingOptions; import io.cryostat.recordings.RecordingHelper.RecordingReplace; import io.cryostat.targets.Target; @@ -37,7 +38,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.smallrye.common.annotation.Blocking; -import io.smallrye.mutiny.Uni; import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.annotation.security.RolesAllowed; import jakarta.inject.Inject; @@ -237,9 +237,24 @@ public void delete(@RestPath long targetId, @RestPath long remoteId) throws Exce @Blocking @Path("/{remoteId}/upload") @RolesAllowed("write") - public Uni uploadToGrafana(@RestPath long targetId, @RestPath long remoteId) + public String uploadToGrafana(@RestPath long targetId, @RestPath long remoteId) throws Exception { - return recordingHelper.uploadToJFRDatasource(targetId, remoteId); + // Send an intermediate response back to the client while another thread handles the upload + // request + logger.info("Creating grafana upload request"); + GrafanaActiveUploadRequest request = + new GrafanaActiveUploadRequest(UUID.randomUUID().toString(), remoteId, targetId); + logger.info( + "Request created: (" + + request.getId() + + ", " + + request.getRemoteId() + + ", " + + request.getTargetId() + + ")"); + bus.publish(ArchiveRequestGenerator.GRAFANA_ACTIVE_ADDRESS, request); + return request.getId(); + // return recordingHelper.uploadToJFRDatasource(targetId, remoteId); } public record LinkedRecordingDescriptor( diff --git a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java index ad70c5508..e7aa9b97f 100644 --- a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java +++ b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java @@ -15,12 +15,14 @@ */ package io.cryostat.recordings; +import java.time.Duration; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import io.cryostat.ConfigProperties; import io.cryostat.ws.MessagingServer; import io.cryostat.ws.Notification; @@ -28,6 +30,8 @@ import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; +import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,14 +39,23 @@ public class ArchiveRequestGenerator { public static final String ARCHIVE_ADDRESS = "io.cryostat.recordings.ArchiveRequestGenerator"; + public static final String GRAFANA_ARCHIVE_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.GrafanaArchiveUploadRequest"; + public static final String GRAFANA_ACTIVE_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.GrafanaActiveUploadRequest"; private static final String ARCHIVE_RECORDING_SUCCESS = "ArchiveRecordingSuccess"; private static final String ARCHIVE_RECORDING_FAIL = "ArchiveRecordingFailed"; + private static final String GRAFANA_UPLOAD_SUCCESS = "GrafanaUploadSuccess"; + private static final String GRAFANA_UPLOAD_FAIL = "GrafanaUploadFailed"; private final ExecutorService executor; private final Logger logger = LoggerFactory.getLogger(getClass()); @Inject private EventBus bus; @Inject private RecordingHelper recordingHelper; + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration timeout; + public ArchiveRequestGenerator(ExecutorService executor) { this.executor = executor; } @@ -81,6 +94,48 @@ public void onMessage(ArchiveRequest request) { } } + @ConsumeEvent(value = GRAFANA_ARCHIVE_ADDRESS) + public void onMessage(GrafanaArchiveUploadRequest request) { + try { + logger.info("Job ID: " + request.getId() + " submitted."); + String result = + recordingHelper + .uploadToJFRDatasource(request.getPair()) + .await() + .atMost(timeout); + logger.info("Grafana upload complete, firing notification"); + bus.publish( + MessagingServer.class.getName(), + new Notification(GRAFANA_UPLOAD_SUCCESS, Map.of("jobId", request.getId()))); + } catch (Exception e) { + logger.warn("Exception thrown while servicing request: ", e); + bus.publish( + MessagingServer.class.getName(), + new Notification(GRAFANA_UPLOAD_FAIL, Map.of("jobId", request.getId()))); + } + } + + @ConsumeEvent(value = GRAFANA_ACTIVE_ADDRESS) + public void onMessage(GrafanaActiveUploadRequest request) { + try { + logger.info("Job ID: " + request.getId() + " submitted."); + String result = + recordingHelper + .uploadToJFRDatasource(request.getTargetId(), request.getRemoteId()) + .await() + .atMost(timeout); + logger.info("Grafana upload complete, firing notification"); + bus.publish( + MessagingServer.class.getName(), + new Notification(GRAFANA_UPLOAD_SUCCESS, Map.of("jobId", request.getId()))); + } catch (Exception e) { + logger.warn("Exception thrown while servicing request: ", e); + bus.publish( + MessagingServer.class.getName(), + new Notification(GRAFANA_UPLOAD_FAIL, Map.of("jobId", request.getId()))); + } + } + public record ArchiveRequest(String id, ActiveRecording recording) { public ArchiveRequest { @@ -96,4 +151,41 @@ public ActiveRecording getRecording() { return recording; } } + + public record GrafanaArchiveUploadRequest(String id, Pair pair) { + + public GrafanaArchiveUploadRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(pair); + } + + public String getId() { + return id; + } + + public Pair getPair() { + return pair; + } + } + + public record GrafanaActiveUploadRequest(String id, long remoteId, long targetId) { + + public GrafanaActiveUploadRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(remoteId); + Objects.requireNonNull(targetId); + } + + public String getId() { + return id; + } + + public long getRemoteId() { + return remoteId; + } + + public long getTargetId() { + return targetId; + } + } } diff --git a/src/main/java/io/cryostat/recordings/ArchivedRecordings.java b/src/main/java/io/cryostat/recordings/ArchivedRecordings.java index 8d7302828..464f52f5b 100644 --- a/src/main/java/io/cryostat/recordings/ArchivedRecordings.java +++ b/src/main/java/io/cryostat/recordings/ArchivedRecordings.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import io.cryostat.ConfigProperties; import io.cryostat.Producers; @@ -34,6 +35,7 @@ import io.cryostat.libcryostat.sys.Clock; import io.cryostat.recordings.ActiveRecording.Listener.ArchivedRecordingEvent; import io.cryostat.recordings.ActiveRecordings.Metadata; +import io.cryostat.recordings.ArchiveRequestGenerator.GrafanaArchiveUploadRequest; import io.cryostat.targets.Target; import io.cryostat.util.HttpMimeType; import io.cryostat.ws.MessagingServer; @@ -42,7 +44,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.quarkus.runtime.StartupEvent; import io.smallrye.common.annotation.Blocking; -import io.smallrye.mutiny.Uni; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.handler.HttpException; import io.vertx.mutiny.core.eventbus.EventBus; @@ -473,7 +474,7 @@ public void deleteArchivedRecording(@RestPath String jvmId, @RestPath String fil @Blocking @Path("/api/v4/grafana/{encodedKey}") @RolesAllowed("write") - public Uni uploadArchivedToGrafana(@RestPath String encodedKey) throws Exception { + public String uploadArchivedToGrafana(@RestPath String encodedKey) throws Exception { var pair = recordingHelper.decodedKey(encodedKey); var key = recordingHelper.archivedRecordingKey(pair); storage.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build()) @@ -487,7 +488,15 @@ public Uni uploadArchivedToGrafana(@RestPath String encodedKey) throws E if (!found) { throw new NotFoundException(); } - return recordingHelper.uploadToJFRDatasource(pair); + // Send an intermediate response back to the client while another thread handles the upload + // request + logger.info("Creating grafana upload request"); + GrafanaArchiveUploadRequest request = + new GrafanaArchiveUploadRequest(UUID.randomUUID().toString(), pair); + logger.info( + "Request created: (" + request.getId() + ", " + request.getPair().toString() + ")"); + bus.publish(ArchiveRequestGenerator.GRAFANA_ARCHIVE_ADDRESS, request); + return request.getId(); } @GET From 8653f32742201ee9b23b98890e76f3bbbc3b2fbc Mon Sep 17 00:00:00 2001 From: jmatsuok Date: Wed, 6 Nov 2024 15:10:03 -0500 Subject: [PATCH 06/12] Add blocking annotation to event handlers, convert constants to class.getname --- .../cryostat/recordings/ArchiveRequestGenerator.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java index e7aa9b97f..9f379770a 100644 --- a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java +++ b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java @@ -38,11 +38,10 @@ @ApplicationScoped public class ArchiveRequestGenerator { - public static final String ARCHIVE_ADDRESS = "io.cryostat.recordings.ArchiveRequestGenerator"; + public static final String ARCHIVE_ADDRESS = ArchiveRequestGenerator.class.getName(); public static final String GRAFANA_ARCHIVE_ADDRESS = - "io.cryostat.recordings.ArchiveRequestGenerator.GrafanaArchiveUploadRequest"; - public static final String GRAFANA_ACTIVE_ADDRESS = - "io.cryostat.recordings.ArchiveRequestGenerator.GrafanaActiveUploadRequest"; + GrafanaArchiveUploadRequest.class.getName(); + public static final String GRAFANA_ACTIVE_ADDRESS = GrafanaActiveUploadRequest.class.getName(); private static final String ARCHIVE_RECORDING_SUCCESS = "ArchiveRecordingSuccess"; private static final String ARCHIVE_RECORDING_FAIL = "ArchiveRecordingFailed"; private static final String GRAFANA_UPLOAD_SUCCESS = "GrafanaUploadSuccess"; @@ -94,7 +93,7 @@ public void onMessage(ArchiveRequest request) { } } - @ConsumeEvent(value = GRAFANA_ARCHIVE_ADDRESS) + @ConsumeEvent(value = GRAFANA_ARCHIVE_ADDRESS, blocking = true) public void onMessage(GrafanaArchiveUploadRequest request) { try { logger.info("Job ID: " + request.getId() + " submitted."); @@ -115,7 +114,7 @@ public void onMessage(GrafanaArchiveUploadRequest request) { } } - @ConsumeEvent(value = GRAFANA_ACTIVE_ADDRESS) + @ConsumeEvent(value = GRAFANA_ACTIVE_ADDRESS, blocking = true) public void onMessage(GrafanaActiveUploadRequest request) { try { logger.info("Job ID: " + request.getId() + " submitted."); From f818344c058fcc73fafc250a5c4a1075286d3c8b Mon Sep 17 00:00:00 2001 From: jmatsuok Date: Wed, 6 Nov 2024 16:47:26 -0500 Subject: [PATCH 07/12] Revert class.getName change, add records for Report Requests --- .../recordings/ArchiveRequestGenerator.java | 44 +++++++++++++++++-- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java index 9f379770a..1ddd51820 100644 --- a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java +++ b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java @@ -38,10 +38,16 @@ @ApplicationScoped public class ArchiveRequestGenerator { - public static final String ARCHIVE_ADDRESS = ArchiveRequestGenerator.class.getName(); + public static final String ARCHIVE_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.ArchiveRequest"; public static final String GRAFANA_ARCHIVE_ADDRESS = - GrafanaArchiveUploadRequest.class.getName(); - public static final String GRAFANA_ACTIVE_ADDRESS = GrafanaActiveUploadRequest.class.getName(); + "io.cryostat.recordings.ArchiveRequestGenerator.GrafanaArchiveUploadRequest"; + public static final String GRAFANA_ACTIVE_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.GrafanaActiveUploadRequest"; + public static final String ARCHIVE_REPORT_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.ArchiveReportRequest"; + public static final String ACTIVE_REPORT_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.ActiveReportRequest"; private static final String ARCHIVE_RECORDING_SUCCESS = "ArchiveRecordingSuccess"; private static final String ARCHIVE_RECORDING_FAIL = "ArchiveRecordingFailed"; private static final String GRAFANA_UPLOAD_SUCCESS = "GrafanaUploadSuccess"; @@ -187,4 +193,36 @@ public long getTargetId() { return targetId; } } + + public record ArchivedReportRequest(String id, Pair pair) { + + public ArchivedReportRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(pair); + } + + public String getId() { + return id; + } + + public Pair getPair() { + return pair; + } + } + + public record ActiveReportRequest(String id, ActiveRecording recording) { + + public ActiveReportRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(recording); + } + + public String getId() { + return id; + } + + public ActiveRecording getRecording() { + return recording; + } + } } From 126b117b5043726e4a200323e7ab30a9855f6a68 Mon Sep 17 00:00:00 2001 From: jmatsuok Date: Fri, 8 Nov 2024 13:42:32 -0500 Subject: [PATCH 08/12] Fire events in end handler of httpsResponse, add report generation to prototype --- .../cryostat/recordings/ActiveRecordings.java | 17 +++++--- .../recordings/ArchiveRequestGenerator.java | 40 +++++++++++++++++++ .../recordings/ArchivedRecordings.java | 7 +++- .../java/io/cryostat/reports/Reports.java | 35 ++++++++++++---- 4 files changed, 84 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/cryostat/recordings/ActiveRecordings.java b/src/main/java/io/cryostat/recordings/ActiveRecordings.java index 3424eab68..f820fac44 100644 --- a/src/main/java/io/cryostat/recordings/ActiveRecordings.java +++ b/src/main/java/io/cryostat/recordings/ActiveRecordings.java @@ -38,6 +38,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.smallrye.common.annotation.Blocking; +import io.vertx.core.http.HttpServerResponse; import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.annotation.security.RolesAllowed; import jakarta.inject.Inject; @@ -104,7 +105,11 @@ public RestResponse download(@RestPath long targetId, @RestPath lon @Blocking @Path("/{remoteId}") @RolesAllowed("write") - public String patch(@RestPath long targetId, @RestPath long remoteId, String body) + public String patch( + HttpServerResponse response, + @RestPath long targetId, + @RestPath long remoteId, + String body) throws Exception { Target target = Target.find("id", targetId).singleResult(); Optional recording = @@ -146,7 +151,8 @@ public String patch(@RestPath long targetId, @RestPath long remoteId, String bod + ", " + request.getRecording().name + ")"); - bus.publish(ArchiveRequestGenerator.ARCHIVE_ADDRESS, request); + response.endHandler( + (e) -> bus.publish(ArchiveRequestGenerator.ARCHIVE_ADDRESS, request)); return request.getId(); default: throw new BadRequestException(body); @@ -237,7 +243,8 @@ public void delete(@RestPath long targetId, @RestPath long remoteId) throws Exce @Blocking @Path("/{remoteId}/upload") @RolesAllowed("write") - public String uploadToGrafana(@RestPath long targetId, @RestPath long remoteId) + public String uploadToGrafana( + HttpServerResponse response, @RestPath long targetId, @RestPath long remoteId) throws Exception { // Send an intermediate response back to the client while another thread handles the upload // request @@ -252,9 +259,9 @@ public String uploadToGrafana(@RestPath long targetId, @RestPath long remoteId) + ", " + request.getTargetId() + ")"); - bus.publish(ArchiveRequestGenerator.GRAFANA_ACTIVE_ADDRESS, request); + response.endHandler( + (e) -> bus.publish(ArchiveRequestGenerator.GRAFANA_ACTIVE_ADDRESS, request)); return request.getId(); - // return recordingHelper.uploadToJFRDatasource(targetId, remoteId); } public record LinkedRecordingDescriptor( diff --git a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java index 1ddd51820..db7f7230e 100644 --- a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java +++ b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java @@ -23,6 +23,8 @@ import java.util.concurrent.Future; import io.cryostat.ConfigProperties; +import io.cryostat.core.reports.InterruptibleReportGenerator.AnalysisResult; +import io.cryostat.reports.ReportsService; import io.cryostat.ws.MessagingServer; import io.cryostat.ws.Notification; @@ -52,11 +54,14 @@ public class ArchiveRequestGenerator { private static final String ARCHIVE_RECORDING_FAIL = "ArchiveRecordingFailed"; private static final String GRAFANA_UPLOAD_SUCCESS = "GrafanaUploadSuccess"; private static final String GRAFANA_UPLOAD_FAIL = "GrafanaUploadFailed"; + private static final String REPORT_SUCCESS = "ReportSuccess"; + private static final String REPORT_FAILURE = "ReportFailure"; private final ExecutorService executor; private final Logger logger = LoggerFactory.getLogger(getClass()); @Inject private EventBus bus; @Inject private RecordingHelper recordingHelper; + @Inject ReportsService reportsService; @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) Duration timeout; @@ -141,6 +146,41 @@ public void onMessage(GrafanaActiveUploadRequest request) { } } + @ConsumeEvent(value = ACTIVE_REPORT_ADDRESS, blocking = true) + public void onMessage(ActiveReportRequest request) { + try { + logger.info("Job ID: " + request.getId() + " submitted."); + Map result = + reportsService.reportFor(request.getRecording()).await().atMost(timeout); + logger.info("Report generation complete, firing notification"); + bus.publish(MessagingServer.class.getName(), new Notification(REPORT_SUCCESS, result)); + } catch (Exception e) { + logger.warn("Exception thrown while servicing request: ", e); + bus.publish( + MessagingServer.class.getName(), + new Notification(REPORT_FAILURE, Map.of("jobId", request.getId()))); + } + } + + @ConsumeEvent(value = ARCHIVE_REPORT_ADDRESS, blocking = true) + public void onMessage(ArchivedReportRequest request) { + try { + logger.info("Job ID: " + request.getId() + " submitted."); + Map result = + reportsService + .reportFor(request.getPair().getKey(), request.getPair().getValue()) + .await() + .atMost(timeout); + logger.info("Report generation complete, firing notification"); + bus.publish(MessagingServer.class.getName(), new Notification(REPORT_SUCCESS, result)); + } catch (Exception e) { + logger.warn("Exception thrown while servicing request: ", e); + bus.publish( + MessagingServer.class.getName(), + new Notification(REPORT_FAILURE, Map.of("jobId", request.getId()))); + } + } + public record ArchiveRequest(String id, ActiveRecording recording) { public ArchiveRequest { diff --git a/src/main/java/io/cryostat/recordings/ArchivedRecordings.java b/src/main/java/io/cryostat/recordings/ArchivedRecordings.java index 464f52f5b..b594811cf 100644 --- a/src/main/java/io/cryostat/recordings/ArchivedRecordings.java +++ b/src/main/java/io/cryostat/recordings/ArchivedRecordings.java @@ -44,6 +44,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.quarkus.runtime.StartupEvent; import io.smallrye.common.annotation.Blocking; +import io.vertx.core.http.HttpServerResponse; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.handler.HttpException; import io.vertx.mutiny.core.eventbus.EventBus; @@ -474,7 +475,8 @@ public void deleteArchivedRecording(@RestPath String jvmId, @RestPath String fil @Blocking @Path("/api/v4/grafana/{encodedKey}") @RolesAllowed("write") - public String uploadArchivedToGrafana(@RestPath String encodedKey) throws Exception { + public String uploadArchivedToGrafana(HttpServerResponse response, @RestPath String encodedKey) + throws Exception { var pair = recordingHelper.decodedKey(encodedKey); var key = recordingHelper.archivedRecordingKey(pair); storage.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build()) @@ -495,7 +497,8 @@ public String uploadArchivedToGrafana(@RestPath String encodedKey) throws Except new GrafanaArchiveUploadRequest(UUID.randomUUID().toString(), pair); logger.info( "Request created: (" + request.getId() + ", " + request.getPair().toString() + ")"); - bus.publish(ArchiveRequestGenerator.GRAFANA_ARCHIVE_ADDRESS, request); + response.endHandler( + (e) -> bus.publish(ArchiveRequestGenerator.GRAFANA_ARCHIVE_ADDRESS, request)); return request.getId(); } diff --git a/src/main/java/io/cryostat/reports/Reports.java b/src/main/java/io/cryostat/reports/Reports.java index 5fd7e7e0f..903a38b37 100644 --- a/src/main/java/io/cryostat/reports/Reports.java +++ b/src/main/java/io/cryostat/reports/Reports.java @@ -15,17 +15,20 @@ */ package io.cryostat.reports; -import java.util.Map; +import java.util.UUID; import io.cryostat.ConfigProperties; import io.cryostat.StorageBuckets; -import io.cryostat.core.reports.InterruptibleReportGenerator.AnalysisResult; +import io.cryostat.recordings.ArchiveRequestGenerator; +import io.cryostat.recordings.ArchiveRequestGenerator.ActiveReportRequest; +import io.cryostat.recordings.ArchiveRequestGenerator.ArchivedReportRequest; import io.cryostat.recordings.RecordingHelper; import io.cryostat.targets.Target; import io.quarkus.runtime.StartupEvent; import io.smallrye.common.annotation.Blocking; -import io.smallrye.mutiny.Uni; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.annotation.security.RolesAllowed; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; @@ -50,6 +53,7 @@ public class Reports { @Inject StorageBuckets storageBuckets; @Inject RecordingHelper helper; @Inject ReportsService reportsService; + @Inject EventBus bus; @Inject Logger logger; // FIXME this observer cannot be declared on the StorageCachingReportsService decorator. @@ -65,10 +69,16 @@ void onStart(@Observes StartupEvent evt) { @Path("/api/v4/reports/{encodedKey}") @Produces(MediaType.APPLICATION_JSON) @RolesAllowed("read") - public Uni> get(@RestPath String encodedKey) { + public String get(HttpServerResponse response, @RestPath String encodedKey) { // TODO implement query parameter for evaluation predicate + logger.info("Creating archived reports request"); var pair = helper.decodedKey(encodedKey); - return reportsService.reportFor(pair.getKey(), pair.getValue()); + ArchivedReportRequest request = + new ArchivedReportRequest(UUID.randomUUID().toString(), pair); + response.endHandler( + (e) -> bus.publish(ArchiveRequestGenerator.ARCHIVE_REPORT_ADDRESS, request)); + return request.getId(); + // return reportsService.reportFor(pair.getKey(), pair.getValue()); } @GET @@ -76,14 +86,23 @@ public Uni> get(@RestPath String encodedKey) { @Path("/api/v4/targets/{targetId}/reports/{recordingId}") @Produces({MediaType.APPLICATION_JSON}) @RolesAllowed("read") - public Uni> getActive( - @RestPath long targetId, @RestPath long recordingId) throws Exception { + public String getActive( + HttpServerResponse response, @RestPath long targetId, @RestPath long recordingId) + throws Exception { + + logger.info("Creating active reports request"); var target = Target.getTargetById(targetId); var recording = target.getRecordingById(recordingId); if (recording == null) { throw new NotFoundException(); } + + ActiveReportRequest request = + new ActiveReportRequest(UUID.randomUUID().toString(), recording); + response.endHandler( + (e) -> bus.publish(ArchiveRequestGenerator.ACTIVE_REPORT_ADDRESS, request)); // TODO implement query parameter for evaluation predicate - return reportsService.reportFor(recording); + return request.getId(); + // return reportsService.reportFor(recording); } } From 55467c15c7529a38a336d53e744ef39bb79acb4e Mon Sep 17 00:00:00 2001 From: jmatsuok Date: Fri, 8 Nov 2024 14:03:35 -0500 Subject: [PATCH 09/12] Add retrieval mechanism for report results, update openapi schema --- schema/openapi.yaml | 37 +++++++++++++++---- .../recordings/ArchiveRequestGenerator.java | 10 +++++ .../java/io/cryostat/reports/Reports.java | 13 +++++++ 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/schema/openapi.yaml b/schema/openapi.yaml index cb979d0bb..520906c48 100644 --- a/schema/openapi.yaml +++ b/schema/openapi.yaml @@ -1226,7 +1226,7 @@ paths: schema: type: string responses: - "200": + "202": content: text/plain: schema: @@ -1456,6 +1456,29 @@ paths: required: true schema: type: string + responses: + "202": + content: + text/plain: + schema: + type: string + description: OK + "401": + description: Not Authorized + "403": + description: Not Allowed + security: + - SecurityScheme: [] + tags: + - Reports + /api/v4/jobs/{jobId}: + get: + parameters: + - in: path + name: jobId + required: true + schema: + type: string responses: "200": content: @@ -1472,7 +1495,7 @@ paths: security: - SecurityScheme: [] tags: - - Reports + - Jobs /api/v4/rules: get: responses: @@ -2165,7 +2188,7 @@ paths: format: int64 type: integer responses: - "200": + "202": content: text/plain: schema: @@ -2195,13 +2218,11 @@ paths: format: int64 type: integer responses: - "200": + "202": content: - application/json: + text/plain: schema: - additionalProperties: - $ref: '#/components/schemas/AnalysisResult' - type: object + type: string description: OK "401": description: Not Authorized diff --git a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java index db7f7230e..6ff4d341b 100644 --- a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java +++ b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -63,11 +64,14 @@ public class ArchiveRequestGenerator { @Inject private RecordingHelper recordingHelper; @Inject ReportsService reportsService; + private Map> jobResults; + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) Duration timeout; public ArchiveRequestGenerator(ExecutorService executor) { this.executor = executor; + this.jobResults = new ConcurrentHashMap<>(); } public Future performArchive(ArchiveRequest request) { @@ -95,6 +99,10 @@ public Future performArchive(ArchiveRequest request) { }); } + public Map getAnalysisResult(String jobID) { + return jobResults.get(jobID); + } + @ConsumeEvent(value = ARCHIVE_ADDRESS) public void onMessage(ArchiveRequest request) { try { @@ -153,6 +161,7 @@ public void onMessage(ActiveReportRequest request) { Map result = reportsService.reportFor(request.getRecording()).await().atMost(timeout); logger.info("Report generation complete, firing notification"); + jobResults.put(request.getId(), result); bus.publish(MessagingServer.class.getName(), new Notification(REPORT_SUCCESS, result)); } catch (Exception e) { logger.warn("Exception thrown while servicing request: ", e); @@ -172,6 +181,7 @@ public void onMessage(ArchivedReportRequest request) { .await() .atMost(timeout); logger.info("Report generation complete, firing notification"); + jobResults.put(request.getId(), result); bus.publish(MessagingServer.class.getName(), new Notification(REPORT_SUCCESS, result)); } catch (Exception e) { logger.warn("Exception thrown while servicing request: ", e); diff --git a/src/main/java/io/cryostat/reports/Reports.java b/src/main/java/io/cryostat/reports/Reports.java index 903a38b37..4a8056927 100644 --- a/src/main/java/io/cryostat/reports/Reports.java +++ b/src/main/java/io/cryostat/reports/Reports.java @@ -15,10 +15,12 @@ */ package io.cryostat.reports; +import java.util.Map; import java.util.UUID; import io.cryostat.ConfigProperties; import io.cryostat.StorageBuckets; +import io.cryostat.core.reports.InterruptibleReportGenerator.AnalysisResult; import io.cryostat.recordings.ArchiveRequestGenerator; import io.cryostat.recordings.ArchiveRequestGenerator.ActiveReportRequest; import io.cryostat.recordings.ArchiveRequestGenerator.ArchivedReportRequest; @@ -50,6 +52,7 @@ public class Reports { @ConfigProperty(name = ConfigProperties.ARCHIVED_REPORTS_STORAGE_CACHE_NAME) String bucket; + @Inject ArchiveRequestGenerator generator; @Inject StorageBuckets storageBuckets; @Inject RecordingHelper helper; @Inject ReportsService reportsService; @@ -64,6 +67,16 @@ void onStart(@Observes StartupEvent evt) { } } + @GET + @Blocking + @Path("/api/v4/jobs/{jobId}") + @Produces(MediaType.APPLICATION_JSON) + @RolesAllowed("read") + public Map getFromJobId(@RestPath String jobId) { + logger.info("Retrieving result for Job ID: " + jobId); + return generator.getAnalysisResult(jobId); + } + @GET @Blocking @Path("/api/v4/reports/{encodedKey}") From 62ee82dc3d7670519efdf47a6c1c99db468e9083 Mon Sep 17 00:00:00 2001 From: jmatsuok Date: Mon, 18 Nov 2024 18:06:46 -0500 Subject: [PATCH 10/12] Rework reports endpoint to check if the requested report is cached --- schema/openapi.yaml | 27 +++---- .../reports/MemoryCachingReportsService.java | 14 ++++ .../java/io/cryostat/reports/Reports.java | 70 +++++++++++++------ .../io/cryostat/reports/ReportsService.java | 4 ++ .../cryostat/reports/ReportsServiceImpl.java | 10 +++ .../reports/StorageCachingReportsService.java | 15 ++++ 6 files changed, 101 insertions(+), 39 deletions(-) diff --git a/schema/openapi.yaml b/schema/openapi.yaml index 520906c48..4b168c992 100644 --- a/schema/openapi.yaml +++ b/schema/openapi.yaml @@ -1463,23 +1463,6 @@ paths: schema: type: string description: OK - "401": - description: Not Authorized - "403": - description: Not Allowed - security: - - SecurityScheme: [] - tags: - - Reports - /api/v4/jobs/{jobId}: - get: - parameters: - - in: path - name: jobId - required: true - schema: - type: string - responses: "200": content: application/json: @@ -1495,7 +1478,7 @@ paths: security: - SecurityScheme: [] tags: - - Jobs + - Reports /api/v4/rules: get: responses: @@ -2218,6 +2201,14 @@ paths: format: int64 type: integer responses: + "200": + content: + application/json: + schema: + additionalProperties: + $ref: '#/components/schemas/AnalysisResult' + type: object + description: OK "202": content: text/plain: diff --git a/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java b/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java index 433538ff2..21a89c1f7 100644 --- a/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java +++ b/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java @@ -16,6 +16,7 @@ package io.cryostat.reports; import java.util.Map; +import java.util.Objects; import java.util.function.Predicate; import org.openjdk.jmc.flightrecorder.rules.IRule; @@ -27,6 +28,7 @@ import io.quarkus.cache.Cache; import io.quarkus.cache.CacheName; +import io.quarkus.cache.CaffeineCache; import io.smallrye.mutiny.Uni; import jakarta.annotation.Priority; import jakarta.decorator.Decorator; @@ -108,4 +110,16 @@ public Uni> reportFor(ActiveRecording recording) { public Uni> reportFor(String jvmId, String filename) { return reportFor(jvmId, filename, r -> true); } + + @Override + public boolean keyExists(ActiveRecording recording) { + String key = ReportsService.key(recording); + return !Objects.isNull(activeCache.as(CaffeineCache.class).getIfPresent(key)); + } + + @Override + public boolean keyExists(String jvmId, String filename) { + String key = recordingHelper.archivedRecordingKey(jvmId, filename); + return !Objects.isNull(archivedCache.as(CaffeineCache.class).getIfPresent(key)); + } } diff --git a/src/main/java/io/cryostat/reports/Reports.java b/src/main/java/io/cryostat/reports/Reports.java index 4a8056927..bc292e6de 100644 --- a/src/main/java/io/cryostat/reports/Reports.java +++ b/src/main/java/io/cryostat/reports/Reports.java @@ -15,12 +15,11 @@ */ package io.cryostat.reports; -import java.util.Map; +import java.time.Duration; import java.util.UUID; import io.cryostat.ConfigProperties; import io.cryostat.StorageBuckets; -import io.cryostat.core.reports.InterruptibleReportGenerator.AnalysisResult; import io.cryostat.recordings.ArchiveRequestGenerator; import io.cryostat.recordings.ArchiveRequestGenerator.ActiveReportRequest; import io.cryostat.recordings.ArchiveRequestGenerator.ArchivedReportRequest; @@ -39,6 +38,7 @@ import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.RestPath; @@ -52,6 +52,9 @@ public class Reports { @ConfigProperty(name = ConfigProperties.ARCHIVED_REPORTS_STORAGE_CACHE_NAME) String bucket; + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration timeout; + @Inject ArchiveRequestGenerator generator; @Inject StorageBuckets storageBuckets; @Inject RecordingHelper helper; @@ -67,31 +70,41 @@ void onStart(@Observes StartupEvent evt) { } } - @GET - @Blocking - @Path("/api/v4/jobs/{jobId}") - @Produces(MediaType.APPLICATION_JSON) - @RolesAllowed("read") - public Map getFromJobId(@RestPath String jobId) { - logger.info("Retrieving result for Job ID: " + jobId); - return generator.getAnalysisResult(jobId); - } - @GET @Blocking @Path("/api/v4/reports/{encodedKey}") @Produces(MediaType.APPLICATION_JSON) @RolesAllowed("read") - public String get(HttpServerResponse response, @RestPath String encodedKey) { + // Response isn't strongly typed which allows us to return either the Analysis result + // or a job ID String along with setting different Status codes. + // TODO: Is there a cleaner way to accomplish this? + public Response get(HttpServerResponse response, @RestPath String encodedKey) { // TODO implement query parameter for evaluation predicate - logger.info("Creating archived reports request"); var pair = helper.decodedKey(encodedKey); + + // Check if we have a cached result already for this report + if (reportsService.keyExists(pair.getKey(), pair.getValue())) { + return Response.ok( + reportsService + .reportFor(pair.getKey(), pair.getValue()) + .await() + .atMost(timeout), + MediaType.APPLICATION_JSON) + .status(200) + .build(); + } + + // If we don't have a cached result, delegate to the ArchiveRequestGenerator + // and return the job ID with a location header. + logger.info("Cache miss. Creating archived reports request"); ArchivedReportRequest request = new ArchivedReportRequest(UUID.randomUUID().toString(), pair); response.endHandler( (e) -> bus.publish(ArchiveRequestGenerator.ARCHIVE_REPORT_ADDRESS, request)); - return request.getId(); - // return reportsService.reportFor(pair.getKey(), pair.getValue()); + return Response.ok(request.getId()) + .status(202) + .header("Location", "/api/v4/reports/" + encodedKey) + .build(); } @GET @@ -99,23 +112,38 @@ public String get(HttpServerResponse response, @RestPath String encodedKey) { @Path("/api/v4/targets/{targetId}/reports/{recordingId}") @Produces({MediaType.APPLICATION_JSON}) @RolesAllowed("read") - public String getActive( + // Response isn't strongly typed which allows us to return either the Analysis result + // or a job ID String along with setting different Status codes. + // TODO: Is there a cleaner way to accomplish this? + public Response getActive( HttpServerResponse response, @RestPath long targetId, @RestPath long recordingId) throws Exception { - - logger.info("Creating active reports request"); var target = Target.getTargetById(targetId); var recording = target.getRecordingById(recordingId); if (recording == null) { throw new NotFoundException(); } + // Check if we've already cached a result for this report, return it if so + if (reportsService.keyExists(recording)) { + return Response.ok(reportsService.reportFor(recording).await().atMost(timeout)) + .status(200) + .build(); + } + + // If there isn't a cached result available, delegate to the ArchiveRequestGenerator + // and return the job ID with a location header. + logger.info("Cache miss. Creating active reports request"); ActiveReportRequest request = new ActiveReportRequest(UUID.randomUUID().toString(), recording); response.endHandler( (e) -> bus.publish(ArchiveRequestGenerator.ACTIVE_REPORT_ADDRESS, request)); // TODO implement query parameter for evaluation predicate - return request.getId(); - // return reportsService.reportFor(recording); + return Response.ok(request.getId()) + .status(202) + .header( + "Location", + "/api/v4/targets/" + target.targetId() + "/reports/" + recordingId) + .build(); } } diff --git a/src/main/java/io/cryostat/reports/ReportsService.java b/src/main/java/io/cryostat/reports/ReportsService.java index 14a22764d..4d982d45b 100644 --- a/src/main/java/io/cryostat/reports/ReportsService.java +++ b/src/main/java/io/cryostat/reports/ReportsService.java @@ -39,4 +39,8 @@ Uni> reportFor( static String key(ActiveRecording recording) { return String.format("%s/%d", recording.target.jvmId, recording.id); } + + public boolean keyExists(ActiveRecording recording); + + public boolean keyExists(String jvmId, String filename); } diff --git a/src/main/java/io/cryostat/reports/ReportsServiceImpl.java b/src/main/java/io/cryostat/reports/ReportsServiceImpl.java index a11f0ec10..fe449c2ec 100644 --- a/src/main/java/io/cryostat/reports/ReportsServiceImpl.java +++ b/src/main/java/io/cryostat/reports/ReportsServiceImpl.java @@ -117,4 +117,14 @@ public ReportGenerationException(Throwable cause) { super(cause); } } + + @Override + public boolean keyExists(ActiveRecording recording) { + return false; + } + + @Override + public boolean keyExists(String jvmId, String filename) { + return false; + } } diff --git a/src/main/java/io/cryostat/reports/StorageCachingReportsService.java b/src/main/java/io/cryostat/reports/StorageCachingReportsService.java index 1a490960f..3ece7a152 100644 --- a/src/main/java/io/cryostat/reports/StorageCachingReportsService.java +++ b/src/main/java/io/cryostat/reports/StorageCachingReportsService.java @@ -64,6 +64,9 @@ class StorageCachingReportsService implements ReportsService { @ConfigProperty(name = ConfigProperties.ARCHIVED_REPORTS_EXPIRY_DURATION) Duration expiry; + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration timeout; + @Inject S3Client storage; @Inject RecordingHelper recordingHelper; @Inject ObjectMapper mapper; @@ -165,4 +168,16 @@ public Uni> reportFor(ActiveRecording recording) { public Uni> reportFor(String jvmId, String filename) { return reportFor(jvmId, filename, r -> true); } + + @Override + public boolean keyExists(ActiveRecording recording) { + String key = ReportsService.key(recording); + return checkStorage(key).await().atMost(timeout); + } + + @Override + public boolean keyExists(String jvmId, String filename) { + String key = recordingHelper.archivedRecordingKey(jvmId, filename); + return checkStorage(key).await().atMost(timeout); + } } From e0c25d22d95a747faa0084dd06799d5b335deb18 Mon Sep 17 00:00:00 2001 From: jmatsuok Date: Thu, 21 Nov 2024 16:12:59 -0500 Subject: [PATCH 11/12] Chang endHandler to bodyEndHandler, fix cache detection --- .../reports/MemoryCachingReportsService.java | 6 ++-- .../java/io/cryostat/reports/Reports.java | 30 ++++++++++++------- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java b/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java index 21a89c1f7..fbc55895e 100644 --- a/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java +++ b/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java @@ -114,12 +114,14 @@ public Uni> reportFor(String jvmId, String filename) @Override public boolean keyExists(ActiveRecording recording) { String key = ReportsService.key(recording); - return !Objects.isNull(activeCache.as(CaffeineCache.class).getIfPresent(key)); + return !Objects.isNull(activeCache.as(CaffeineCache.class).getIfPresent(key)) + && !delegate.keyExists(recording); } @Override public boolean keyExists(String jvmId, String filename) { String key = recordingHelper.archivedRecordingKey(jvmId, filename); - return !Objects.isNull(archivedCache.as(CaffeineCache.class).getIfPresent(key)); + return !Objects.isNull(archivedCache.as(CaffeineCache.class).getIfPresent(key)) + && !delegate.keyExists(jvmId, filename); } } diff --git a/src/main/java/io/cryostat/reports/Reports.java b/src/main/java/io/cryostat/reports/Reports.java index bc292e6de..c0809f9a2 100644 --- a/src/main/java/io/cryostat/reports/Reports.java +++ b/src/main/java/io/cryostat/reports/Reports.java @@ -36,9 +36,9 @@ import jakarta.ws.rs.GET; import jakarta.ws.rs.NotFoundException; import jakarta.ws.rs.Path; -import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriBuilder; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.RestPath; @@ -73,7 +73,6 @@ void onStart(@Observes StartupEvent evt) { @GET @Blocking @Path("/api/v4/reports/{encodedKey}") - @Produces(MediaType.APPLICATION_JSON) @RolesAllowed("read") // Response isn't strongly typed which allows us to return either the Analysis result // or a job ID String along with setting different Status codes. @@ -99,18 +98,22 @@ public Response get(HttpServerResponse response, @RestPath String encodedKey) { logger.info("Cache miss. Creating archived reports request"); ArchivedReportRequest request = new ArchivedReportRequest(UUID.randomUUID().toString(), pair); - response.endHandler( + response.bodyEndHandler( (e) -> bus.publish(ArchiveRequestGenerator.ARCHIVE_REPORT_ADDRESS, request)); return Response.ok(request.getId()) .status(202) - .header("Location", "/api/v4/reports/" + encodedKey) + .location( + UriBuilder.fromUri( + String.format( + "/api/v4/targets/%d/reports/%d", + pair.getLeft(), pair.getRight())) + .build()) .build(); } @GET @Blocking @Path("/api/v4/targets/{targetId}/reports/{recordingId}") - @Produces({MediaType.APPLICATION_JSON}) @RolesAllowed("read") // Response isn't strongly typed which allows us to return either the Analysis result // or a job ID String along with setting different Status codes. @@ -136,14 +139,21 @@ public Response getActive( logger.info("Cache miss. Creating active reports request"); ActiveReportRequest request = new ActiveReportRequest(UUID.randomUUID().toString(), recording); - response.endHandler( - (e) -> bus.publish(ArchiveRequestGenerator.ACTIVE_REPORT_ADDRESS, request)); + // This doesn't get fired + response.bodyEndHandler( + (e) -> { + logger.info("Did we get here? Firing Active report event"); + bus.publish(ArchiveRequestGenerator.ACTIVE_REPORT_ADDRESS, request); + }); // TODO implement query parameter for evaluation predicate return Response.ok(request.getId()) .status(202) - .header( - "Location", - "/api/v4/targets/" + target.targetId() + "/reports/" + recordingId) + .location( + UriBuilder.fromUri( + String.format( + "/api/v4/targets/%d/reports/%d", + target.id, recordingId)) + .build()) .build(); } } From 026bd66bbeaddb9f161ca09ec4702f55325b2f58 Mon Sep 17 00:00:00 2001 From: Cryostat CI Date: Thu, 21 Nov 2024 22:33:24 +0000 Subject: [PATCH 12/12] chore(schema): automatic update --- schema/openapi.yaml | 102 +++++++++++++++----------------------------- 1 file changed, 35 insertions(+), 67 deletions(-) diff --git a/schema/openapi.yaml b/schema/openapi.yaml index 4b168c992..8c8c378a7 100644 --- a/schema/openapi.yaml +++ b/schema/openapi.yaml @@ -1,18 +1,6 @@ --- components: schemas: - AnalysisResult: - properties: - evaluation: - $ref: '#/components/schemas/Evaluation' - name: - type: string - score: - format: double - type: number - topic: - type: string - type: object Annotations: properties: cryostat: @@ -239,19 +227,6 @@ components: - id - realm type: object - Evaluation: - properties: - explanation: - type: string - solution: - type: string - suggestions: - items: - $ref: '#/components/schemas/Suggestion' - type: array - summary: - type: string - type: object Event: properties: clazz: @@ -309,6 +284,16 @@ components: hash: type: string type: object + HttpServerResponse: + properties: + chunked: + type: boolean + statusCode: + format: int32 + type: integer + statusMessage: + type: string + type: object Instant: example: 2022-03-10T16:15:50Z format: date-time @@ -554,15 +539,6 @@ components: name: type: string type: object - Suggestion: - properties: - name: - type: string - setting: - type: string - value: - type: string - type: object Target: properties: agent: @@ -1225,8 +1201,13 @@ paths: required: true schema: type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/HttpServerResponse' responses: - "202": + "200": content: text/plain: schema: @@ -1456,20 +1437,13 @@ paths: required: true schema: type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/HttpServerResponse' responses: - "202": - content: - text/plain: - schema: - type: string - description: OK "200": - content: - application/json: - schema: - additionalProperties: - $ref: '#/components/schemas/AnalysisResult' - type: object description: OK "401": description: Not Authorized @@ -2133,11 +2107,11 @@ paths: type: integer requestBody: content: - text/plain: + application/json: schema: - type: string + $ref: '#/components/schemas/HttpServerResponse' responses: - "202": + "200": content: text/plain: schema: @@ -2147,10 +2121,6 @@ paths: description: Not Authorized "403": description: Not Allowed - "404": - description: Not Found - "502": - description: Target not Reachable security: - SecurityScheme: [] tags: @@ -2170,8 +2140,13 @@ paths: schema: format: int64 type: integer + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/HttpServerResponse' responses: - "202": + "200": content: text/plain: schema: @@ -2200,20 +2175,13 @@ paths: schema: format: int64 type: integer + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/HttpServerResponse' responses: "200": - content: - application/json: - schema: - additionalProperties: - $ref: '#/components/schemas/AnalysisResult' - type: object - description: OK - "202": - content: - text/plain: - schema: - type: string description: OK "401": description: Not Authorized