Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Add prototype for long running API calls for Archived Recordings #698

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion schema/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2131,7 +2131,7 @@ paths:
schema:
type: string
responses:
"200":
"202":
content:
text/plain:
schema:
Expand All @@ -2141,6 +2141,10 @@ paths:
description: Not Authorized
"403":
description: Not Allowed
"404":
description: Not Found
"502":
description: Target not Reachable
security:
- SecurityScheme: []
tags:
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/cryostat/Producers.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.cryostat.core.reports.InterruptibleReportGenerator;
import io.cryostat.libcryostat.sys.Clock;
import io.cryostat.libcryostat.sys.FileSystem;
import io.cryostat.recordings.ArchiveRequestGenerator;

import io.quarkus.arc.DefaultBean;
import io.vertx.mutiny.core.Vertx;
Expand Down Expand Up @@ -76,6 +77,15 @@ public static InterruptibleReportGenerator produceInterruptibleReportGenerator()
singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool());
}

@Produces
@RequestScoped
@DefaultBean
public static ArchiveRequestGenerator produceArchiveRequestGenerator() {
boolean singleThread = Runtime.getRuntime().availableProcessors() < 2;
return new ArchiveRequestGenerator(
singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool());
}

@Produces
@DefaultBean
public WebClient produceWebClient(Vertx vertx) {
Expand Down
64 changes: 48 additions & 16 deletions src/main/java/io/cryostat/recordings/ActiveRecordings.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.cryostat.recordings;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
Expand All @@ -25,18 +24,21 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

import io.cryostat.ConfigProperties;
import io.cryostat.libcryostat.templates.Template;
import io.cryostat.libcryostat.templates.TemplateType;
import io.cryostat.recordings.ArchiveRequestGenerator.ArchiveRequest;
import io.cryostat.recordings.ArchiveRequestGenerator.GrafanaActiveUploadRequest;
import io.cryostat.recordings.RecordingHelper.RecordingOptions;
import io.cryostat.recordings.RecordingHelper.RecordingReplace;
import io.cryostat.targets.Target;

import com.fasterxml.jackson.databind.ObjectMapper;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
Expand Down Expand Up @@ -64,6 +66,8 @@ public class ActiveRecordings {

@Inject ObjectMapper mapper;
@Inject RecordingHelper recordingHelper;
@Inject ArchiveRequestGenerator generator;
@Inject EventBus bus;
@Inject Logger logger;

@ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT)
Expand Down Expand Up @@ -119,18 +123,31 @@ public String patch(@RestPath long targetId, @RestPath long remoteId, String bod
.atMost(connectionFailedTimeout);
return null;
case "save":
try {
// FIXME this operation might take a long time to complete, depending on the
// amount of JFR data in the target and the speed of the connection between the
// target and Cryostat. We should not make the client wait until this operation
// completes before sending a response - it should be async. Here we should just
// return an Accepted response, and if a failure occurs that should be indicated
// as a websocket notification.
return recordingHelper.archiveRecording(activeRecording, null, null).name();
} catch (IOException ioe) {
logger.warn(ioe);
return null;
}
// FIXME this operation might take a long time to complete, depending on the
// amount of JFR data in the target and the speed of the connection between the
// target and Cryostat. We should not make the client wait until this operation
// completes before sending a response - it should be async. Here we should just
// return an Accepted response, and if a failure occurs that should be indicated
// as a websocket notification.

/*
* Desired workflow:
* Client sends a PATCH request to Cryostat
* Cryostat receives the PATCH and checks that the specified active recording exists and that the target JVM is reachable (ex. try to open a connection and do something relatively lightweight like compute its JVM ID). If this check succeeds respond to the PATCH with 202, if it fails respond with a 404 (recording not found) or 502 (target not reachable) etc.
* In the background, Cryostat creates the S3 file upload request, opens a target connection, pipes the bytes, etc. - same as steps 2-5 above
* Cryostat emits a WebSocket notification, either indicating task successful completion or task failure.
*/
logger.info("Ceating request");
ArchiveRequest request =
new ArchiveRequest(UUID.randomUUID().toString(), activeRecording);
logger.info(
"Request created: ("
+ request.getId()
+ ", "
+ request.getRecording().name
+ ")");
bus.publish(ArchiveRequestGenerator.ARCHIVE_ADDRESS, request);
return request.getId();
Josh-Matsuoka marked this conversation as resolved.
Show resolved Hide resolved
default:
throw new BadRequestException(body);
}
Expand Down Expand Up @@ -220,9 +237,24 @@ public void delete(@RestPath long targetId, @RestPath long remoteId) throws Exce
@Blocking
@Path("/{remoteId}/upload")
@RolesAllowed("write")
public Uni<String> uploadToGrafana(@RestPath long targetId, @RestPath long remoteId)
public String uploadToGrafana(@RestPath long targetId, @RestPath long remoteId)
throws Exception {
return recordingHelper.uploadToJFRDatasource(targetId, remoteId);
// Send an intermediate response back to the client while another thread handles the upload
// request
logger.info("Creating grafana upload request");
GrafanaActiveUploadRequest request =
new GrafanaActiveUploadRequest(UUID.randomUUID().toString(), remoteId, targetId);
logger.info(
"Request created: ("
+ request.getId()
+ ", "
+ request.getRemoteId()
+ ", "
+ request.getTargetId()
+ ")");
bus.publish(ArchiveRequestGenerator.GRAFANA_ACTIVE_ADDRESS, request);
return request.getId();
// return recordingHelper.uploadToJFRDatasource(targetId, remoteId);
}

public record LinkedRecordingDescriptor(
Expand Down
191 changes: 191 additions & 0 deletions src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Copyright The Cryostat Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cryostat.recordings;

import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import io.cryostat.ConfigProperties;
import io.cryostat.ws.MessagingServer;
import io.cryostat.ws.Notification;

import io.quarkus.vertx.ConsumeEvent;
import io.vertx.mutiny.core.eventbus.EventBus;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class ArchiveRequestGenerator {

public static final String ARCHIVE_ADDRESS = "io.cryostat.recordings.ArchiveRequestGenerator";
public static final String GRAFANA_ARCHIVE_ADDRESS =
"io.cryostat.recordings.ArchiveRequestGenerator.GrafanaArchiveUploadRequest";
public static final String GRAFANA_ACTIVE_ADDRESS =
"io.cryostat.recordings.ArchiveRequestGenerator.GrafanaActiveUploadRequest";
Josh-Matsuoka marked this conversation as resolved.
Show resolved Hide resolved
private static final String ARCHIVE_RECORDING_SUCCESS = "ArchiveRecordingSuccess";
private static final String ARCHIVE_RECORDING_FAIL = "ArchiveRecordingFailed";
private static final String GRAFANA_UPLOAD_SUCCESS = "GrafanaUploadSuccess";
private static final String GRAFANA_UPLOAD_FAIL = "GrafanaUploadFailed";
private final ExecutorService executor;
private final Logger logger = LoggerFactory.getLogger(getClass());

@Inject private EventBus bus;
@Inject private RecordingHelper recordingHelper;

@ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT)
Duration timeout;

public ArchiveRequestGenerator(ExecutorService executor) {
this.executor = executor;
Josh-Matsuoka marked this conversation as resolved.
Show resolved Hide resolved
}

public Future<String> performArchive(ArchiveRequest request) {
Objects.requireNonNull(request.getRecording());
return executor.submit(
() -> {
logger.info("Job ID: " + request.getId() + " submitted.");
try {
recordingHelper.archiveRecording(request.getRecording(), null, null).name();
logger.info("Recording archived, firing notification");
bus.publish(
MessagingServer.class.getName(),
new Notification(
ARCHIVE_RECORDING_SUCCESS,
Map.of("jobId", request.getId())));
return request.getId();
} catch (Exception e) {
logger.info("Archiving failed");
bus.publish(
MessagingServer.class.getName(),
new Notification(
ARCHIVE_RECORDING_FAIL, Map.of("jobId", request.getId())));
throw new CompletionException(e);
}
});
}

@ConsumeEvent(value = ARCHIVE_ADDRESS)
Josh-Matsuoka marked this conversation as resolved.
Show resolved Hide resolved
public void onMessage(ArchiveRequest request) {
try {
performArchive(request);
} catch (Exception e) {
logger.warn("Exception thrown while servicing request: ", e);
}
}

@ConsumeEvent(value = GRAFANA_ARCHIVE_ADDRESS)
public void onMessage(GrafanaArchiveUploadRequest request) {
try {
logger.info("Job ID: " + request.getId() + " submitted.");
String result =
recordingHelper
.uploadToJFRDatasource(request.getPair())
.await()
Josh-Matsuoka marked this conversation as resolved.
Show resolved Hide resolved
.atMost(timeout);
logger.info("Grafana upload complete, firing notification");
bus.publish(
MessagingServer.class.getName(),
new Notification(GRAFANA_UPLOAD_SUCCESS, Map.of("jobId", request.getId())));
} catch (Exception e) {
logger.warn("Exception thrown while servicing request: ", e);
bus.publish(
MessagingServer.class.getName(),
new Notification(GRAFANA_UPLOAD_FAIL, Map.of("jobId", request.getId())));
}
}

@ConsumeEvent(value = GRAFANA_ACTIVE_ADDRESS)
public void onMessage(GrafanaActiveUploadRequest request) {
try {
logger.info("Job ID: " + request.getId() + " submitted.");
String result =
recordingHelper
.uploadToJFRDatasource(request.getTargetId(), request.getRemoteId())
.await()
.atMost(timeout);
logger.info("Grafana upload complete, firing notification");
bus.publish(
MessagingServer.class.getName(),
new Notification(GRAFANA_UPLOAD_SUCCESS, Map.of("jobId", request.getId())));
} catch (Exception e) {
logger.warn("Exception thrown while servicing request: ", e);
bus.publish(
MessagingServer.class.getName(),
new Notification(GRAFANA_UPLOAD_FAIL, Map.of("jobId", request.getId())));
}
}

public record ArchiveRequest(String id, ActiveRecording recording) {

public ArchiveRequest {
Objects.requireNonNull(id);
Objects.requireNonNull(recording);
}

public String getId() {
return id;
}

public ActiveRecording getRecording() {
return recording;
}
}

public record GrafanaArchiveUploadRequest(String id, Pair<String, String> pair) {
Josh-Matsuoka marked this conversation as resolved.
Show resolved Hide resolved

public GrafanaArchiveUploadRequest {
Objects.requireNonNull(id);
Objects.requireNonNull(pair);
}

public String getId() {
return id;
}

public Pair<String, String> getPair() {
return pair;
}
}

public record GrafanaActiveUploadRequest(String id, long remoteId, long targetId) {

public GrafanaActiveUploadRequest {
Objects.requireNonNull(id);
Objects.requireNonNull(remoteId);
Objects.requireNonNull(targetId);
}

public String getId() {
return id;
}

public long getRemoteId() {
return remoteId;
}

public long getTargetId() {
return targetId;
}
}
}
15 changes: 12 additions & 3 deletions src/main/java/io/cryostat/recordings/ArchivedRecordings.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

import io.cryostat.ConfigProperties;
import io.cryostat.Producers;
import io.cryostat.StorageBuckets;
import io.cryostat.libcryostat.sys.Clock;
import io.cryostat.recordings.ActiveRecording.Listener.ArchivedRecordingEvent;
import io.cryostat.recordings.ActiveRecordings.Metadata;
import io.cryostat.recordings.ArchiveRequestGenerator.GrafanaArchiveUploadRequest;
import io.cryostat.targets.Target;
import io.cryostat.util.HttpMimeType;
import io.cryostat.ws.MessagingServer;
Expand All @@ -42,7 +44,6 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.handler.HttpException;
import io.vertx.mutiny.core.eventbus.EventBus;
Expand Down Expand Up @@ -473,7 +474,7 @@ public void deleteArchivedRecording(@RestPath String jvmId, @RestPath String fil
@Blocking
@Path("/api/v4/grafana/{encodedKey}")
@RolesAllowed("write")
public Uni<String> uploadArchivedToGrafana(@RestPath String encodedKey) throws Exception {
public String uploadArchivedToGrafana(@RestPath String encodedKey) throws Exception {
var pair = recordingHelper.decodedKey(encodedKey);
var key = recordingHelper.archivedRecordingKey(pair);
storage.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build())
Expand All @@ -487,7 +488,15 @@ public Uni<String> uploadArchivedToGrafana(@RestPath String encodedKey) throws E
if (!found) {
throw new NotFoundException();
}
return recordingHelper.uploadToJFRDatasource(pair);
// Send an intermediate response back to the client while another thread handles the upload
// request
logger.info("Creating grafana upload request");
GrafanaArchiveUploadRequest request =
new GrafanaArchiveUploadRequest(UUID.randomUUID().toString(), pair);
logger.info(
"Request created: (" + request.getId() + ", " + request.getPair().toString() + ")");
bus.publish(ArchiveRequestGenerator.GRAFANA_ARCHIVE_ADDRESS, request);
return request.getId();
}

@GET
Expand Down
Loading
Loading