Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Add prototype for long running API calls for Archived Recordings #698

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions schema/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1226,7 +1226,7 @@ paths:
schema:
type: string
responses:
"200":
"202":
content:
text/plain:
schema:
Expand Down Expand Up @@ -1457,6 +1457,12 @@ paths:
schema:
type: string
responses:
"202":
content:
text/plain:
schema:
type: string
description: OK
"200":
content:
application/json:
Expand Down Expand Up @@ -2131,7 +2137,7 @@ paths:
schema:
type: string
responses:
"200":
"202":
content:
text/plain:
schema:
Expand All @@ -2141,6 +2147,10 @@ paths:
description: Not Authorized
"403":
description: Not Allowed
"404":
description: Not Found
"502":
description: Target not Reachable
security:
- SecurityScheme: []
tags:
Expand All @@ -2161,7 +2171,7 @@ paths:
format: int64
type: integer
responses:
"200":
"202":
content:
text/plain:
schema:
Expand Down Expand Up @@ -2199,6 +2209,12 @@ paths:
$ref: '#/components/schemas/AnalysisResult'
type: object
description: OK
"202":
content:
text/plain:
schema:
type: string
description: OK
"401":
description: Not Authorized
"403":
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/cryostat/Producers.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.cryostat.core.reports.InterruptibleReportGenerator;
import io.cryostat.libcryostat.sys.Clock;
import io.cryostat.libcryostat.sys.FileSystem;
import io.cryostat.recordings.ArchiveRequestGenerator;

import io.quarkus.arc.DefaultBean;
import io.vertx.mutiny.core.Vertx;
Expand Down Expand Up @@ -76,6 +77,15 @@ public static InterruptibleReportGenerator produceInterruptibleReportGenerator()
singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool());
}

@Produces
@RequestScoped
@DefaultBean
public static ArchiveRequestGenerator produceArchiveRequestGenerator() {
boolean singleThread = Runtime.getRuntime().availableProcessors() < 2;
return new ArchiveRequestGenerator(
singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool());
}

@Produces
@DefaultBean
public WebClient produceWebClient(Vertx vertx) {
Expand Down
73 changes: 56 additions & 17 deletions src/main/java/io/cryostat/recordings/ActiveRecordings.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.cryostat.recordings;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
Expand All @@ -25,18 +24,22 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

import io.cryostat.ConfigProperties;
import io.cryostat.libcryostat.templates.Template;
import io.cryostat.libcryostat.templates.TemplateType;
import io.cryostat.recordings.ArchiveRequestGenerator.ArchiveRequest;
import io.cryostat.recordings.ArchiveRequestGenerator.GrafanaActiveUploadRequest;
import io.cryostat.recordings.RecordingHelper.RecordingOptions;
import io.cryostat.recordings.RecordingHelper.RecordingReplace;
import io.cryostat.targets.Target;

import com.fasterxml.jackson.databind.ObjectMapper;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.mutiny.core.eventbus.EventBus;
import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
Expand Down Expand Up @@ -64,6 +67,8 @@ public class ActiveRecordings {

@Inject ObjectMapper mapper;
@Inject RecordingHelper recordingHelper;
@Inject ArchiveRequestGenerator generator;
@Inject EventBus bus;
@Inject Logger logger;

@ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT)
Expand Down Expand Up @@ -100,7 +105,11 @@ public RestResponse<InputStream> download(@RestPath long targetId, @RestPath lon
@Blocking
@Path("/{remoteId}")
@RolesAllowed("write")
public String patch(@RestPath long targetId, @RestPath long remoteId, String body)
public String patch(
HttpServerResponse response,
@RestPath long targetId,
@RestPath long remoteId,
String body)
throws Exception {
Target target = Target.find("id", targetId).singleResult();
Optional<ActiveRecording> recording =
Expand All @@ -119,18 +128,32 @@ public String patch(@RestPath long targetId, @RestPath long remoteId, String bod
.atMost(connectionFailedTimeout);
return null;
case "save":
try {
// FIXME this operation might take a long time to complete, depending on the
// amount of JFR data in the target and the speed of the connection between the
// target and Cryostat. We should not make the client wait until this operation
// completes before sending a response - it should be async. Here we should just
// return an Accepted response, and if a failure occurs that should be indicated
// as a websocket notification.
return recordingHelper.archiveRecording(activeRecording, null, null).name();
} catch (IOException ioe) {
logger.warn(ioe);
return null;
}
// FIXME this operation might take a long time to complete, depending on the
// amount of JFR data in the target and the speed of the connection between the
// target and Cryostat. We should not make the client wait until this operation
// completes before sending a response - it should be async. Here we should just
// return an Accepted response, and if a failure occurs that should be indicated
// as a websocket notification.

/*
* Desired workflow:
* Client sends a PATCH request to Cryostat
* Cryostat receives the PATCH and checks that the specified active recording exists and that the target JVM is reachable (ex. try to open a connection and do something relatively lightweight like compute its JVM ID). If this check succeeds respond to the PATCH with 202, if it fails respond with a 404 (recording not found) or 502 (target not reachable) etc.
* In the background, Cryostat creates the S3 file upload request, opens a target connection, pipes the bytes, etc. - same as steps 2-5 above
* Cryostat emits a WebSocket notification, either indicating task successful completion or task failure.
*/
logger.info("Ceating request");
ArchiveRequest request =
new ArchiveRequest(UUID.randomUUID().toString(), activeRecording);
logger.info(
"Request created: ("
+ request.getId()
+ ", "
+ request.getRecording().name
+ ")");
response.endHandler(
(e) -> bus.publish(ArchiveRequestGenerator.ARCHIVE_ADDRESS, request));
return request.getId();
Josh-Matsuoka marked this conversation as resolved.
Show resolved Hide resolved
default:
throw new BadRequestException(body);
}
Expand Down Expand Up @@ -220,9 +243,25 @@ public void delete(@RestPath long targetId, @RestPath long remoteId) throws Exce
@Blocking
@Path("/{remoteId}/upload")
@RolesAllowed("write")
public Uni<String> uploadToGrafana(@RestPath long targetId, @RestPath long remoteId)
public String uploadToGrafana(
HttpServerResponse response, @RestPath long targetId, @RestPath long remoteId)
throws Exception {
return recordingHelper.uploadToJFRDatasource(targetId, remoteId);
// Send an intermediate response back to the client while another thread handles the upload
// request
logger.info("Creating grafana upload request");
GrafanaActiveUploadRequest request =
new GrafanaActiveUploadRequest(UUID.randomUUID().toString(), remoteId, targetId);
logger.info(
"Request created: ("
+ request.getId()
+ ", "
+ request.getRemoteId()
+ ", "
+ request.getTargetId()
+ ")");
response.endHandler(
(e) -> bus.publish(ArchiveRequestGenerator.GRAFANA_ACTIVE_ADDRESS, request));
return request.getId();
}

public record LinkedRecordingDescriptor(
Expand Down
Loading
Loading