diff --git a/schema/openapi.yaml b/schema/openapi.yaml index 7ccd335f9..8c8c378a7 100644 --- a/schema/openapi.yaml +++ b/schema/openapi.yaml @@ -1,18 +1,6 @@ --- components: schemas: - AnalysisResult: - properties: - evaluation: - $ref: '#/components/schemas/Evaluation' - name: - type: string - score: - format: double - type: number - topic: - type: string - type: object Annotations: properties: cryostat: @@ -239,19 +227,6 @@ components: - id - realm type: object - Evaluation: - properties: - explanation: - type: string - solution: - type: string - suggestions: - items: - $ref: '#/components/schemas/Suggestion' - type: array - summary: - type: string - type: object Event: properties: clazz: @@ -309,6 +284,16 @@ components: hash: type: string type: object + HttpServerResponse: + properties: + chunked: + type: boolean + statusCode: + format: int32 + type: integer + statusMessage: + type: string + type: object Instant: example: 2022-03-10T16:15:50Z format: date-time @@ -554,15 +539,6 @@ components: name: type: string type: object - Suggestion: - properties: - name: - type: string - setting: - type: string - value: - type: string - type: object Target: properties: agent: @@ -1225,6 +1201,11 @@ paths: required: true schema: type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/HttpServerResponse' responses: "200": content: @@ -1456,14 +1437,13 @@ paths: required: true schema: type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/HttpServerResponse' responses: "200": - content: - application/json: - schema: - additionalProperties: - $ref: '#/components/schemas/AnalysisResult' - type: object description: OK "401": description: Not Authorized @@ -2127,9 +2107,9 @@ paths: type: integer requestBody: content: - text/plain: + application/json: schema: - type: string + $ref: '#/components/schemas/HttpServerResponse' responses: "200": content: @@ -2160,6 +2140,11 @@ paths: schema: format: int64 type: integer + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/HttpServerResponse' responses: "200": content: @@ -2190,14 +2175,13 @@ paths: schema: format: int64 type: integer + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/HttpServerResponse' responses: "200": - content: - application/json: - schema: - additionalProperties: - $ref: '#/components/schemas/AnalysisResult' - type: object description: OK "401": description: Not Authorized diff --git a/src/main/java/io/cryostat/Producers.java b/src/main/java/io/cryostat/Producers.java index 035836753..8b94dcd0b 100644 --- a/src/main/java/io/cryostat/Producers.java +++ b/src/main/java/io/cryostat/Producers.java @@ -22,6 +22,7 @@ import io.cryostat.core.reports.InterruptibleReportGenerator; import io.cryostat.libcryostat.sys.Clock; import io.cryostat.libcryostat.sys.FileSystem; +import io.cryostat.recordings.ArchiveRequestGenerator; import io.quarkus.arc.DefaultBean; import io.vertx.mutiny.core.Vertx; @@ -76,6 +77,15 @@ public static InterruptibleReportGenerator produceInterruptibleReportGenerator() singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool()); } + @Produces + @RequestScoped + @DefaultBean + public static ArchiveRequestGenerator produceArchiveRequestGenerator() { + boolean singleThread = Runtime.getRuntime().availableProcessors() < 2; + return new ArchiveRequestGenerator( + singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool()); + } + @Produces @DefaultBean public WebClient produceWebClient(Vertx vertx) { diff --git a/src/main/java/io/cryostat/recordings/ActiveRecordings.java b/src/main/java/io/cryostat/recordings/ActiveRecordings.java index 99520b4bc..f820fac44 100644 --- a/src/main/java/io/cryostat/recordings/ActiveRecordings.java +++ b/src/main/java/io/cryostat/recordings/ActiveRecordings.java @@ -15,7 +15,6 @@ */ package io.cryostat.recordings; -import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.time.Duration; @@ -25,10 +24,13 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import io.cryostat.ConfigProperties; import io.cryostat.libcryostat.templates.Template; import io.cryostat.libcryostat.templates.TemplateType; +import io.cryostat.recordings.ArchiveRequestGenerator.ArchiveRequest; +import io.cryostat.recordings.ArchiveRequestGenerator.GrafanaActiveUploadRequest; import io.cryostat.recordings.RecordingHelper.RecordingOptions; import io.cryostat.recordings.RecordingHelper.RecordingReplace; import io.cryostat.targets.Target; @@ -36,7 +38,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.smallrye.common.annotation.Blocking; -import io.smallrye.mutiny.Uni; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.annotation.security.RolesAllowed; import jakarta.inject.Inject; import jakarta.transaction.Transactional; @@ -64,6 +67,8 @@ public class ActiveRecordings { @Inject ObjectMapper mapper; @Inject RecordingHelper recordingHelper; + @Inject ArchiveRequestGenerator generator; + @Inject EventBus bus; @Inject Logger logger; @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) @@ -100,7 +105,11 @@ public RestResponse download(@RestPath long targetId, @RestPath lon @Blocking @Path("/{remoteId}") @RolesAllowed("write") - public String patch(@RestPath long targetId, @RestPath long remoteId, String body) + public String patch( + HttpServerResponse response, + @RestPath long targetId, + @RestPath long remoteId, + String body) throws Exception { Target target = Target.find("id", targetId).singleResult(); Optional recording = @@ -119,18 +128,32 @@ public String patch(@RestPath long targetId, @RestPath long remoteId, String bod .atMost(connectionFailedTimeout); return null; case "save": - try { - // FIXME this operation might take a long time to complete, depending on the - // amount of JFR data in the target and the speed of the connection between the - // target and Cryostat. We should not make the client wait until this operation - // completes before sending a response - it should be async. Here we should just - // return an Accepted response, and if a failure occurs that should be indicated - // as a websocket notification. - return recordingHelper.archiveRecording(activeRecording, null, null).name(); - } catch (IOException ioe) { - logger.warn(ioe); - return null; - } + // FIXME this operation might take a long time to complete, depending on the + // amount of JFR data in the target and the speed of the connection between the + // target and Cryostat. We should not make the client wait until this operation + // completes before sending a response - it should be async. Here we should just + // return an Accepted response, and if a failure occurs that should be indicated + // as a websocket notification. + + /* + * Desired workflow: + * Client sends a PATCH request to Cryostat + * Cryostat receives the PATCH and checks that the specified active recording exists and that the target JVM is reachable (ex. try to open a connection and do something relatively lightweight like compute its JVM ID). If this check succeeds respond to the PATCH with 202, if it fails respond with a 404 (recording not found) or 502 (target not reachable) etc. + * In the background, Cryostat creates the S3 file upload request, opens a target connection, pipes the bytes, etc. - same as steps 2-5 above + * Cryostat emits a WebSocket notification, either indicating task successful completion or task failure. + */ + logger.info("Ceating request"); + ArchiveRequest request = + new ArchiveRequest(UUID.randomUUID().toString(), activeRecording); + logger.info( + "Request created: (" + + request.getId() + + ", " + + request.getRecording().name + + ")"); + response.endHandler( + (e) -> bus.publish(ArchiveRequestGenerator.ARCHIVE_ADDRESS, request)); + return request.getId(); default: throw new BadRequestException(body); } @@ -220,9 +243,25 @@ public void delete(@RestPath long targetId, @RestPath long remoteId) throws Exce @Blocking @Path("/{remoteId}/upload") @RolesAllowed("write") - public Uni uploadToGrafana(@RestPath long targetId, @RestPath long remoteId) + public String uploadToGrafana( + HttpServerResponse response, @RestPath long targetId, @RestPath long remoteId) throws Exception { - return recordingHelper.uploadToJFRDatasource(targetId, remoteId); + // Send an intermediate response back to the client while another thread handles the upload + // request + logger.info("Creating grafana upload request"); + GrafanaActiveUploadRequest request = + new GrafanaActiveUploadRequest(UUID.randomUUID().toString(), remoteId, targetId); + logger.info( + "Request created: (" + + request.getId() + + ", " + + request.getRemoteId() + + ", " + + request.getTargetId() + + ")"); + response.endHandler( + (e) -> bus.publish(ArchiveRequestGenerator.GRAFANA_ACTIVE_ADDRESS, request)); + return request.getId(); } public record LinkedRecordingDescriptor( diff --git a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java new file mode 100644 index 000000000..6ff4d341b --- /dev/null +++ b/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java @@ -0,0 +1,278 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.recordings; + +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import io.cryostat.ConfigProperties; +import io.cryostat.core.reports.InterruptibleReportGenerator.AnalysisResult; +import io.cryostat.reports.ReportsService; +import io.cryostat.ws.MessagingServer; +import io.cryostat.ws.Notification; + +import io.quarkus.vertx.ConsumeEvent; +import io.vertx.mutiny.core.eventbus.EventBus; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ApplicationScoped +public class ArchiveRequestGenerator { + + public static final String ARCHIVE_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.ArchiveRequest"; + public static final String GRAFANA_ARCHIVE_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.GrafanaArchiveUploadRequest"; + public static final String GRAFANA_ACTIVE_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.GrafanaActiveUploadRequest"; + public static final String ARCHIVE_REPORT_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.ArchiveReportRequest"; + public static final String ACTIVE_REPORT_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.ActiveReportRequest"; + private static final String ARCHIVE_RECORDING_SUCCESS = "ArchiveRecordingSuccess"; + private static final String ARCHIVE_RECORDING_FAIL = "ArchiveRecordingFailed"; + private static final String GRAFANA_UPLOAD_SUCCESS = "GrafanaUploadSuccess"; + private static final String GRAFANA_UPLOAD_FAIL = "GrafanaUploadFailed"; + private static final String REPORT_SUCCESS = "ReportSuccess"; + private static final String REPORT_FAILURE = "ReportFailure"; + private final ExecutorService executor; + private final Logger logger = LoggerFactory.getLogger(getClass()); + + @Inject private EventBus bus; + @Inject private RecordingHelper recordingHelper; + @Inject ReportsService reportsService; + + private Map> jobResults; + + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration timeout; + + public ArchiveRequestGenerator(ExecutorService executor) { + this.executor = executor; + this.jobResults = new ConcurrentHashMap<>(); + } + + public Future performArchive(ArchiveRequest request) { + Objects.requireNonNull(request.getRecording()); + return executor.submit( + () -> { + logger.info("Job ID: " + request.getId() + " submitted."); + try { + recordingHelper.archiveRecording(request.getRecording(), null, null).name(); + logger.info("Recording archived, firing notification"); + bus.publish( + MessagingServer.class.getName(), + new Notification( + ARCHIVE_RECORDING_SUCCESS, + Map.of("jobId", request.getId()))); + return request.getId(); + } catch (Exception e) { + logger.info("Archiving failed"); + bus.publish( + MessagingServer.class.getName(), + new Notification( + ARCHIVE_RECORDING_FAIL, Map.of("jobId", request.getId()))); + throw new CompletionException(e); + } + }); + } + + public Map getAnalysisResult(String jobID) { + return jobResults.get(jobID); + } + + @ConsumeEvent(value = ARCHIVE_ADDRESS) + public void onMessage(ArchiveRequest request) { + try { + performArchive(request); + } catch (Exception e) { + logger.warn("Exception thrown while servicing request: ", e); + } + } + + @ConsumeEvent(value = GRAFANA_ARCHIVE_ADDRESS, blocking = true) + public void onMessage(GrafanaArchiveUploadRequest request) { + try { + logger.info("Job ID: " + request.getId() + " submitted."); + String result = + recordingHelper + .uploadToJFRDatasource(request.getPair()) + .await() + .atMost(timeout); + logger.info("Grafana upload complete, firing notification"); + bus.publish( + MessagingServer.class.getName(), + new Notification(GRAFANA_UPLOAD_SUCCESS, Map.of("jobId", request.getId()))); + } catch (Exception e) { + logger.warn("Exception thrown while servicing request: ", e); + bus.publish( + MessagingServer.class.getName(), + new Notification(GRAFANA_UPLOAD_FAIL, Map.of("jobId", request.getId()))); + } + } + + @ConsumeEvent(value = GRAFANA_ACTIVE_ADDRESS, blocking = true) + public void onMessage(GrafanaActiveUploadRequest request) { + try { + logger.info("Job ID: " + request.getId() + " submitted."); + String result = + recordingHelper + .uploadToJFRDatasource(request.getTargetId(), request.getRemoteId()) + .await() + .atMost(timeout); + logger.info("Grafana upload complete, firing notification"); + bus.publish( + MessagingServer.class.getName(), + new Notification(GRAFANA_UPLOAD_SUCCESS, Map.of("jobId", request.getId()))); + } catch (Exception e) { + logger.warn("Exception thrown while servicing request: ", e); + bus.publish( + MessagingServer.class.getName(), + new Notification(GRAFANA_UPLOAD_FAIL, Map.of("jobId", request.getId()))); + } + } + + @ConsumeEvent(value = ACTIVE_REPORT_ADDRESS, blocking = true) + public void onMessage(ActiveReportRequest request) { + try { + logger.info("Job ID: " + request.getId() + " submitted."); + Map result = + reportsService.reportFor(request.getRecording()).await().atMost(timeout); + logger.info("Report generation complete, firing notification"); + jobResults.put(request.getId(), result); + bus.publish(MessagingServer.class.getName(), new Notification(REPORT_SUCCESS, result)); + } catch (Exception e) { + logger.warn("Exception thrown while servicing request: ", e); + bus.publish( + MessagingServer.class.getName(), + new Notification(REPORT_FAILURE, Map.of("jobId", request.getId()))); + } + } + + @ConsumeEvent(value = ARCHIVE_REPORT_ADDRESS, blocking = true) + public void onMessage(ArchivedReportRequest request) { + try { + logger.info("Job ID: " + request.getId() + " submitted."); + Map result = + reportsService + .reportFor(request.getPair().getKey(), request.getPair().getValue()) + .await() + .atMost(timeout); + logger.info("Report generation complete, firing notification"); + jobResults.put(request.getId(), result); + bus.publish(MessagingServer.class.getName(), new Notification(REPORT_SUCCESS, result)); + } catch (Exception e) { + logger.warn("Exception thrown while servicing request: ", e); + bus.publish( + MessagingServer.class.getName(), + new Notification(REPORT_FAILURE, Map.of("jobId", request.getId()))); + } + } + + public record ArchiveRequest(String id, ActiveRecording recording) { + + public ArchiveRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(recording); + } + + public String getId() { + return id; + } + + public ActiveRecording getRecording() { + return recording; + } + } + + public record GrafanaArchiveUploadRequest(String id, Pair pair) { + + public GrafanaArchiveUploadRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(pair); + } + + public String getId() { + return id; + } + + public Pair getPair() { + return pair; + } + } + + public record GrafanaActiveUploadRequest(String id, long remoteId, long targetId) { + + public GrafanaActiveUploadRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(remoteId); + Objects.requireNonNull(targetId); + } + + public String getId() { + return id; + } + + public long getRemoteId() { + return remoteId; + } + + public long getTargetId() { + return targetId; + } + } + + public record ArchivedReportRequest(String id, Pair pair) { + + public ArchivedReportRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(pair); + } + + public String getId() { + return id; + } + + public Pair getPair() { + return pair; + } + } + + public record ActiveReportRequest(String id, ActiveRecording recording) { + + public ActiveReportRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(recording); + } + + public String getId() { + return id; + } + + public ActiveRecording getRecording() { + return recording; + } + } +} diff --git a/src/main/java/io/cryostat/recordings/ArchivedRecordings.java b/src/main/java/io/cryostat/recordings/ArchivedRecordings.java index 8d7302828..b594811cf 100644 --- a/src/main/java/io/cryostat/recordings/ArchivedRecordings.java +++ b/src/main/java/io/cryostat/recordings/ArchivedRecordings.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import io.cryostat.ConfigProperties; import io.cryostat.Producers; @@ -34,6 +35,7 @@ import io.cryostat.libcryostat.sys.Clock; import io.cryostat.recordings.ActiveRecording.Listener.ArchivedRecordingEvent; import io.cryostat.recordings.ActiveRecordings.Metadata; +import io.cryostat.recordings.ArchiveRequestGenerator.GrafanaArchiveUploadRequest; import io.cryostat.targets.Target; import io.cryostat.util.HttpMimeType; import io.cryostat.ws.MessagingServer; @@ -42,7 +44,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.quarkus.runtime.StartupEvent; import io.smallrye.common.annotation.Blocking; -import io.smallrye.mutiny.Uni; +import io.vertx.core.http.HttpServerResponse; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.handler.HttpException; import io.vertx.mutiny.core.eventbus.EventBus; @@ -473,7 +475,8 @@ public void deleteArchivedRecording(@RestPath String jvmId, @RestPath String fil @Blocking @Path("/api/v4/grafana/{encodedKey}") @RolesAllowed("write") - public Uni uploadArchivedToGrafana(@RestPath String encodedKey) throws Exception { + public String uploadArchivedToGrafana(HttpServerResponse response, @RestPath String encodedKey) + throws Exception { var pair = recordingHelper.decodedKey(encodedKey); var key = recordingHelper.archivedRecordingKey(pair); storage.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build()) @@ -487,7 +490,16 @@ public Uni uploadArchivedToGrafana(@RestPath String encodedKey) throws E if (!found) { throw new NotFoundException(); } - return recordingHelper.uploadToJFRDatasource(pair); + // Send an intermediate response back to the client while another thread handles the upload + // request + logger.info("Creating grafana upload request"); + GrafanaArchiveUploadRequest request = + new GrafanaArchiveUploadRequest(UUID.randomUUID().toString(), pair); + logger.info( + "Request created: (" + request.getId() + ", " + request.getPair().toString() + ")"); + response.endHandler( + (e) -> bus.publish(ArchiveRequestGenerator.GRAFANA_ARCHIVE_ADDRESS, request)); + return request.getId(); } @GET diff --git a/src/main/java/io/cryostat/recordings/RecordingHelper.java b/src/main/java/io/cryostat/recordings/RecordingHelper.java index dd9912491..c98f1e086 100644 --- a/src/main/java/io/cryostat/recordings/RecordingHelper.java +++ b/src/main/java/io/cryostat/recordings/RecordingHelper.java @@ -905,6 +905,7 @@ public ArchivedRecording archiveRecording( URI connectUrl = recording.target.connectUrl; + /* var event = new ArchivedRecordingEvent( ActiveRecordings.RecordingEventCategory.ARCHIVED_CREATED, @@ -912,7 +913,7 @@ public ArchivedRecording archiveRecording( bus.publish(event.category().category(), event.payload().recording()); bus.publish( MessagingServer.class.getName(), - new Notification(event.category().category(), event.payload())); + new Notification(event.category().category(), event.payload()));*/ } return new ArchivedRecording( recording.target.jvmId, diff --git a/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java b/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java index 433538ff2..fbc55895e 100644 --- a/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java +++ b/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java @@ -16,6 +16,7 @@ package io.cryostat.reports; import java.util.Map; +import java.util.Objects; import java.util.function.Predicate; import org.openjdk.jmc.flightrecorder.rules.IRule; @@ -27,6 +28,7 @@ import io.quarkus.cache.Cache; import io.quarkus.cache.CacheName; +import io.quarkus.cache.CaffeineCache; import io.smallrye.mutiny.Uni; import jakarta.annotation.Priority; import jakarta.decorator.Decorator; @@ -108,4 +110,18 @@ public Uni> reportFor(ActiveRecording recording) { public Uni> reportFor(String jvmId, String filename) { return reportFor(jvmId, filename, r -> true); } + + @Override + public boolean keyExists(ActiveRecording recording) { + String key = ReportsService.key(recording); + return !Objects.isNull(activeCache.as(CaffeineCache.class).getIfPresent(key)) + && !delegate.keyExists(recording); + } + + @Override + public boolean keyExists(String jvmId, String filename) { + String key = recordingHelper.archivedRecordingKey(jvmId, filename); + return !Objects.isNull(archivedCache.as(CaffeineCache.class).getIfPresent(key)) + && !delegate.keyExists(jvmId, filename); + } } diff --git a/src/main/java/io/cryostat/reports/Reports.java b/src/main/java/io/cryostat/reports/Reports.java index 5fd7e7e0f..c0809f9a2 100644 --- a/src/main/java/io/cryostat/reports/Reports.java +++ b/src/main/java/io/cryostat/reports/Reports.java @@ -15,25 +15,30 @@ */ package io.cryostat.reports; -import java.util.Map; +import java.time.Duration; +import java.util.UUID; import io.cryostat.ConfigProperties; import io.cryostat.StorageBuckets; -import io.cryostat.core.reports.InterruptibleReportGenerator.AnalysisResult; +import io.cryostat.recordings.ArchiveRequestGenerator; +import io.cryostat.recordings.ArchiveRequestGenerator.ActiveReportRequest; +import io.cryostat.recordings.ArchiveRequestGenerator.ArchivedReportRequest; import io.cryostat.recordings.RecordingHelper; import io.cryostat.targets.Target; import io.quarkus.runtime.StartupEvent; import io.smallrye.common.annotation.Blocking; -import io.smallrye.mutiny.Uni; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.annotation.security.RolesAllowed; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; import jakarta.ws.rs.GET; import jakarta.ws.rs.NotFoundException; import jakarta.ws.rs.Path; -import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriBuilder; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.RestPath; @@ -47,9 +52,14 @@ public class Reports { @ConfigProperty(name = ConfigProperties.ARCHIVED_REPORTS_STORAGE_CACHE_NAME) String bucket; + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration timeout; + + @Inject ArchiveRequestGenerator generator; @Inject StorageBuckets storageBuckets; @Inject RecordingHelper helper; @Inject ReportsService reportsService; + @Inject EventBus bus; @Inject Logger logger; // FIXME this observer cannot be declared on the StorageCachingReportsService decorator. @@ -63,27 +73,87 @@ void onStart(@Observes StartupEvent evt) { @GET @Blocking @Path("/api/v4/reports/{encodedKey}") - @Produces(MediaType.APPLICATION_JSON) @RolesAllowed("read") - public Uni> get(@RestPath String encodedKey) { + // Response isn't strongly typed which allows us to return either the Analysis result + // or a job ID String along with setting different Status codes. + // TODO: Is there a cleaner way to accomplish this? + public Response get(HttpServerResponse response, @RestPath String encodedKey) { // TODO implement query parameter for evaluation predicate var pair = helper.decodedKey(encodedKey); - return reportsService.reportFor(pair.getKey(), pair.getValue()); + + // Check if we have a cached result already for this report + if (reportsService.keyExists(pair.getKey(), pair.getValue())) { + return Response.ok( + reportsService + .reportFor(pair.getKey(), pair.getValue()) + .await() + .atMost(timeout), + MediaType.APPLICATION_JSON) + .status(200) + .build(); + } + + // If we don't have a cached result, delegate to the ArchiveRequestGenerator + // and return the job ID with a location header. + logger.info("Cache miss. Creating archived reports request"); + ArchivedReportRequest request = + new ArchivedReportRequest(UUID.randomUUID().toString(), pair); + response.bodyEndHandler( + (e) -> bus.publish(ArchiveRequestGenerator.ARCHIVE_REPORT_ADDRESS, request)); + return Response.ok(request.getId()) + .status(202) + .location( + UriBuilder.fromUri( + String.format( + "/api/v4/targets/%d/reports/%d", + pair.getLeft(), pair.getRight())) + .build()) + .build(); } @GET @Blocking @Path("/api/v4/targets/{targetId}/reports/{recordingId}") - @Produces({MediaType.APPLICATION_JSON}) @RolesAllowed("read") - public Uni> getActive( - @RestPath long targetId, @RestPath long recordingId) throws Exception { + // Response isn't strongly typed which allows us to return either the Analysis result + // or a job ID String along with setting different Status codes. + // TODO: Is there a cleaner way to accomplish this? + public Response getActive( + HttpServerResponse response, @RestPath long targetId, @RestPath long recordingId) + throws Exception { var target = Target.getTargetById(targetId); var recording = target.getRecordingById(recordingId); if (recording == null) { throw new NotFoundException(); } + + // Check if we've already cached a result for this report, return it if so + if (reportsService.keyExists(recording)) { + return Response.ok(reportsService.reportFor(recording).await().atMost(timeout)) + .status(200) + .build(); + } + + // If there isn't a cached result available, delegate to the ArchiveRequestGenerator + // and return the job ID with a location header. + logger.info("Cache miss. Creating active reports request"); + ActiveReportRequest request = + new ActiveReportRequest(UUID.randomUUID().toString(), recording); + // This doesn't get fired + response.bodyEndHandler( + (e) -> { + logger.info("Did we get here? Firing Active report event"); + bus.publish(ArchiveRequestGenerator.ACTIVE_REPORT_ADDRESS, request); + }); // TODO implement query parameter for evaluation predicate - return reportsService.reportFor(recording); + return Response.ok(request.getId()) + .status(202) + .location( + UriBuilder.fromUri( + String.format( + "/api/v4/targets/%d/reports/%d", + target.id, recordingId)) + .build()) + .build(); } } diff --git a/src/main/java/io/cryostat/reports/ReportsService.java b/src/main/java/io/cryostat/reports/ReportsService.java index 14a22764d..4d982d45b 100644 --- a/src/main/java/io/cryostat/reports/ReportsService.java +++ b/src/main/java/io/cryostat/reports/ReportsService.java @@ -39,4 +39,8 @@ Uni> reportFor( static String key(ActiveRecording recording) { return String.format("%s/%d", recording.target.jvmId, recording.id); } + + public boolean keyExists(ActiveRecording recording); + + public boolean keyExists(String jvmId, String filename); } diff --git a/src/main/java/io/cryostat/reports/ReportsServiceImpl.java b/src/main/java/io/cryostat/reports/ReportsServiceImpl.java index a11f0ec10..fe449c2ec 100644 --- a/src/main/java/io/cryostat/reports/ReportsServiceImpl.java +++ b/src/main/java/io/cryostat/reports/ReportsServiceImpl.java @@ -117,4 +117,14 @@ public ReportGenerationException(Throwable cause) { super(cause); } } + + @Override + public boolean keyExists(ActiveRecording recording) { + return false; + } + + @Override + public boolean keyExists(String jvmId, String filename) { + return false; + } } diff --git a/src/main/java/io/cryostat/reports/StorageCachingReportsService.java b/src/main/java/io/cryostat/reports/StorageCachingReportsService.java index 1a490960f..3ece7a152 100644 --- a/src/main/java/io/cryostat/reports/StorageCachingReportsService.java +++ b/src/main/java/io/cryostat/reports/StorageCachingReportsService.java @@ -64,6 +64,9 @@ class StorageCachingReportsService implements ReportsService { @ConfigProperty(name = ConfigProperties.ARCHIVED_REPORTS_EXPIRY_DURATION) Duration expiry; + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration timeout; + @Inject S3Client storage; @Inject RecordingHelper recordingHelper; @Inject ObjectMapper mapper; @@ -165,4 +168,16 @@ public Uni> reportFor(ActiveRecording recording) { public Uni> reportFor(String jvmId, String filename) { return reportFor(jvmId, filename, r -> true); } + + @Override + public boolean keyExists(ActiveRecording recording) { + String key = ReportsService.key(recording); + return checkStorage(key).await().atMost(timeout); + } + + @Override + public boolean keyExists(String jvmId, String filename) { + String key = recordingHelper.archivedRecordingKey(jvmId, filename); + return checkStorage(key).await().atMost(timeout); + } }