Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Add prototype for long running API calls for Archived Recordings #698

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion schema/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2131,7 +2131,7 @@ paths:
schema:
type: string
responses:
"200":
"202":
content:
text/plain:
schema:
Expand All @@ -2141,6 +2141,10 @@ paths:
description: Not Authorized
"403":
description: Not Allowed
"404":
description: Not Found
"502":
description: Target not Reachable
security:
- SecurityScheme: []
tags:
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/cryostat/Producers.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.cryostat.core.reports.InterruptibleReportGenerator;
import io.cryostat.libcryostat.sys.Clock;
import io.cryostat.libcryostat.sys.FileSystem;
import io.cryostat.recordings.ArchiveRequestGenerator;

import io.quarkus.arc.DefaultBean;
import io.vertx.mutiny.core.Vertx;
Expand Down Expand Up @@ -76,6 +77,15 @@ public static InterruptibleReportGenerator produceInterruptibleReportGenerator()
singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool());
}

@Produces
@RequestScoped
@DefaultBean
public static ArchiveRequestGenerator produceArchiveRequestGenerator() {
boolean singleThread = Runtime.getRuntime().availableProcessors() < 2;
return new ArchiveRequestGenerator(
singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool());
}

@Produces
@DefaultBean
public WebClient produceWebClient(Vertx vertx) {
Expand Down
43 changes: 30 additions & 13 deletions src/main/java/io/cryostat/recordings/ActiveRecordings.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.cryostat.recordings;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
Expand All @@ -25,10 +24,12 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

import io.cryostat.ConfigProperties;
import io.cryostat.libcryostat.templates.Template;
import io.cryostat.libcryostat.templates.TemplateType;
import io.cryostat.recordings.ArchiveRequestGenerator.ArchiveRequest;
import io.cryostat.recordings.RecordingHelper.RecordingOptions;
import io.cryostat.recordings.RecordingHelper.RecordingReplace;
import io.cryostat.targets.Target;
Expand All @@ -37,6 +38,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
Expand Down Expand Up @@ -64,6 +66,8 @@ public class ActiveRecordings {

@Inject ObjectMapper mapper;
@Inject RecordingHelper recordingHelper;
@Inject ArchiveRequestGenerator generator;
@Inject EventBus bus;
@Inject Logger logger;

@ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT)
Expand Down Expand Up @@ -119,18 +123,31 @@ public String patch(@RestPath long targetId, @RestPath long remoteId, String bod
.atMost(connectionFailedTimeout);
return null;
case "save":
try {
// FIXME this operation might take a long time to complete, depending on the
// amount of JFR data in the target and the speed of the connection between the
// target and Cryostat. We should not make the client wait until this operation
// completes before sending a response - it should be async. Here we should just
// return an Accepted response, and if a failure occurs that should be indicated
// as a websocket notification.
return recordingHelper.archiveRecording(activeRecording, null, null).name();
} catch (IOException ioe) {
logger.warn(ioe);
return null;
}
// FIXME this operation might take a long time to complete, depending on the
// amount of JFR data in the target and the speed of the connection between the
// target and Cryostat. We should not make the client wait until this operation
// completes before sending a response - it should be async. Here we should just
// return an Accepted response, and if a failure occurs that should be indicated
// as a websocket notification.

/*
* Desired workflow:
* Client sends a PATCH request to Cryostat
* Cryostat receives the PATCH and checks that the specified active recording exists and that the target JVM is reachable (ex. try to open a connection and do something relatively lightweight like compute its JVM ID). If this check succeeds respond to the PATCH with 202, if it fails respond with a 404 (recording not found) or 502 (target not reachable) etc.
* In the background, Cryostat creates the S3 file upload request, opens a target connection, pipes the bytes, etc. - same as steps 2-5 above
* Cryostat emits a WebSocket notification, either indicating task successful completion or task failure.
*/
logger.info("Ceating request");
ArchiveRequest request =
new ArchiveRequest(UUID.randomUUID().toString(), activeRecording);
logger.info(
"Request created: ("
+ request.getId()
+ ", "
+ request.getRecording().name
+ ")");
bus.publish(ArchiveRequestGenerator.ARCHIVE_ADDRESS, request);
return request.getId();
Josh-Matsuoka marked this conversation as resolved.
Show resolved Hide resolved
default:
throw new BadRequestException(body);
}
Expand Down
99 changes: 99 additions & 0 deletions src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright The Cryostat Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cryostat.recordings;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import io.cryostat.ws.MessagingServer;
import io.cryostat.ws.Notification;

import io.quarkus.vertx.ConsumeEvent;
import io.vertx.mutiny.core.eventbus.EventBus;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class ArchiveRequestGenerator {

public static final String ARCHIVE_ADDRESS = "io.cryostat.recordings.ArchiveRequestGenerator";
private static final String ARCHIVE_RECORDING_SUCCESS = "ArchiveRecordingSuccess";
private static final String ARCHIVE_RECORDING_FAIL = "ArchiveRecordingFailed";
private final ExecutorService executor;
private final Logger logger = LoggerFactory.getLogger(getClass());

@Inject private EventBus bus;
@Inject private RecordingHelper recordingHelper;

public ArchiveRequestGenerator(ExecutorService executor) {
this.executor = executor;
Josh-Matsuoka marked this conversation as resolved.
Show resolved Hide resolved
}

public Future<String> performArchive(ArchiveRequest request) {
Objects.requireNonNull(request.getRecording());
return executor.submit(
() -> {
logger.info("Job ID: " + request.getId() + " submitted.");
try {
recordingHelper.archiveRecording(request.getRecording(), null, null).name();
logger.info("Recording archived, firing notification");
bus.publish(
MessagingServer.class.getName(),
new Notification(
ARCHIVE_RECORDING_SUCCESS,
Map.of("jobId", request.getId())));
return request.getId();
} catch (Exception e) {
logger.info("Archiving failed");
bus.publish(
MessagingServer.class.getName(),
new Notification(
ARCHIVE_RECORDING_FAIL, Map.of("jobId", request.getId())));
throw new CompletionException(e);
}
});
}

@ConsumeEvent(value = ARCHIVE_ADDRESS)
Josh-Matsuoka marked this conversation as resolved.
Show resolved Hide resolved
public void onMessage(ArchiveRequest request) {
try {
performArchive(request);
} catch (Exception e) {
logger.warn("Exception thrown while servicing request: ", e);
}
}

public record ArchiveRequest(String id, ActiveRecording recording) {

public ArchiveRequest {
Objects.requireNonNull(id);
Objects.requireNonNull(recording);
}

public String getId() {
return id;
}

public ActiveRecording getRecording() {
return recording;
}
}
}
3 changes: 2 additions & 1 deletion src/main/java/io/cryostat/recordings/RecordingHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -905,14 +905,15 @@ public ArchivedRecording archiveRecording(

URI connectUrl = recording.target.connectUrl;

/*
var event =
new ArchivedRecordingEvent(
ActiveRecordings.RecordingEventCategory.ARCHIVED_CREATED,
ArchivedRecordingEvent.Payload.of(connectUrl, archivedRecording));
bus.publish(event.category().category(), event.payload().recording());
bus.publish(
MessagingServer.class.getName(),
new Notification(event.category().category(), event.payload()));
new Notification(event.category().category(), event.payload()));*/
}
return new ArchivedRecording(
recording.target.jvmId,
Expand Down
Loading